adnanhemani commented on code in PR #1844: URL: https://github.com/apache/polaris/pull/1844#discussion_r2308704688
########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; + +public class PolarisEvent { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalProperties; Review Comment: I believe it has to do with the serializability of the record in and out of the database. But I'll readily admit, I did not test this and am just following the existing patterns in other similar entities. Maybe someone should do this in the future and if possible, change this for all entities. ########## runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java: ########## @@ -26,10 +26,10 @@ * of this event relative to the validation checks we've performed, which means the commit may still * fail Polaris-side validation checks. * - * @param identifier The identifier. + * @param tableIdentifier The identifier. * @param base The old metadata. * @param metadata The new metadata. */ public record BeforeTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + String eventId, TableIdentifier tableIdentifier, TableMetadata base, TableMetadata metadata) Review Comment: Changed! ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + @Override + public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + @Override + public void onAfterTableCreated(AfterTableCreatedEvent event) { Review Comment: I added this in a comment from a previous review but I am planning to add almost all events eventually to this (basically all that you see except the Request Rate Limited ones). I will add a TODO to this code to make this apparent. ########## runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +/** Emitted Polaris creates a catalog. */ +public record AfterCatalogCreatedEvent(String eventId, String catalogName) Review Comment: Yes, ideally the other instrumentation PRs I have in my fork (which I will surface to the main repo shortly) will take care of this. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; + +public class PolarisEvent { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); Review Comment: Added! ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; +import java.time.Duration; + +@StaticInitSafe +@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer") Review Comment: I was trying to say this is an event listener that sinks events into the persistence layer using an in-memory buffer. I know this makes it longer, but would something like `persistence-sink-through-in-memory-buffer` make it more clear? I'm afraid that `in-memory` doesn't actually capture the goal of the event listener, which is to sink the events into the persistence layer. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + + // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer` + @Override + public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + @Override + public void onAfterTableCreated(AfterTableCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalProperties(additionalParameters); + addToBuffer(polarisEvent); + } + + @Override + public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent); + } + + protected record ContextSpecificInformation(long timestamp, String principalName) {} + + abstract ContextSpecificInformation getContextSpecificInformation(); + + abstract String getRequestId(); + + abstract void addToBuffer(org.apache.polaris.core.entity.PolarisEvent event); Review Comment: Changed. On the topic of RateLimited events, I think the default for events that will eventually find themselves in the Persistence layer, it should be that they are not processed in any OSS implementation. If any implementation would like to actually persist this event, they should be free to override this method. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { Review Comment: This is how we can do a best-effort flush according to the `timeToFlush` set by the user. Was there a different way you were thinking of this? ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = Review Comment: Unfortunately, I haven't found a different Queue implementation which solves all of the following constraints: * Thread-safe for multiple threads writing in an append-only manner. * Constant-time adds * No locking required on adds * Unbounded size At the risk of over-engineering this, I will add a new class which can help us keep track of the size in constant time in a best-effort manner. It may not always be completely accurate (since it is not atomic) - but should be good enough to determine whether we should flush or not IMO. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); Review Comment: Updated to reflect this documentation. ########## runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java: ########## @@ -167,6 +173,8 @@ public Response createCatalog( validateExternalCatalog(catalog); Catalog newCatalog = CatalogEntity.of(adminService.createCatalog(request)).asCatalog(); LOGGER.info("Created new catalog {}", newCatalog); + polarisEventListener.onAfterCatalogCreated( Review Comment: That's correct. I will rebase this PR against the Decorator PR before merging - and then will remove any of these old event instrumentation I introduced as part of this PR. Putting a note on the draft PRs for this! ########## runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** Emitted when Polaris creates a table. */ +public record AfterTableCreatedEvent( + String eventId, String catalogName, TableMetadata metadata, TableIdentifier identifier) Review Comment: 1. Good call, changed. 2. This is a good point as well - I was trying hard to not touch already existing events in this PR. But I think you're right, let me make these changes. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} Review Comment: Removed - based on below comment. ########## runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java: ########## @@ -24,5 +24,5 @@ * @param method The request's HTTP method * @param absolutePath The request's absolute path */ -public record BeforeRequestRateLimitedEvent(String method, String absolutePath) +public record BeforeRequestRateLimitedEvent(String eventId, String method, String absolutePath) Review Comment: It currently is like this. I'm not opposed to making this change - but perhaps we don't make this change as part of this PR? WDYT? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java: ########## @@ -1959,4 +1960,14 @@ private List<PolarisBaseEntity> loadPoliciesFromMappingRecords( .collect(Collectors.toList()); return ms.lookupEntities(callCtx, policyEntityIds); } + + @Nonnull + @Override + public void writeEvents( Review Comment: Removed. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); Review Comment: I didn't know this existed - thanks for showing this! ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Executor did not shut down cleanly"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, true); + } catch (Exception e) { + LOGGER.debug("Buffer flushing task failed for realm ({}): ", realmId, e); + } + } + } + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent) { + String realmId = callContext.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callContext.getPolarisCallContext().copy())); + if (buffer.get(realmId).size() >= maxBufferSize) { + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId, true)), 1); + } + } + + @VisibleForTesting + void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) { + ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + EventAndContext head = queue.peek(); + if (head == null) { + return; + } + + Duration elapsed = Duration.ofMillis(clock.millis() - head.polarisEvent.getTimestampMs()); + + if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || forceFlush) { + // Atomically replace old queue with new queue + boolean replaced = buffer.replace(realmId, queue, new ConcurrentLinkedQueue<>()); + if (!replaced) { + // Another thread concurrently modified the buffer, so do not continue + return; + } + + metaStoreManagerFactory + .getOrCreateMetaStoreManager(() -> realmId) + .writeEvents( + head.callContext(), + new ArrayList<>(queue.stream().map(EventAndContext::polarisEvent).toList())); Review Comment: Removed. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Executor did not shut down cleanly"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, true); + } catch (Exception e) { + LOGGER.debug("Buffer flushing task failed for realm ({}): ", realmId, e); + } + } + } + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent) { + String realmId = callContext.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callContext.getPolarisCallContext().copy())); + if (buffer.get(realmId).size() >= maxBufferSize) { + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId, true)), 1); + } + } + + @VisibleForTesting + void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) { + ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + EventAndContext head = queue.peek(); + if (head == null) { + return; + } + + Duration elapsed = Duration.ofMillis(clock.millis() - head.polarisEvent.getTimestampMs()); + + if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || forceFlush) { + // Atomically replace old queue with new queue + boolean replaced = buffer.replace(realmId, queue, new ConcurrentLinkedQueue<>()); Review Comment: Wanted to check one thing on this - I believe `equals` is not overridden from the Object class at any point by `ConcurrentLinkedQueue`, so technically this is only going to check whether or not it is the exact memory address, right? Am I missing some implementation override along the way? ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Executor did not shut down cleanly"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, true); + } catch (Exception e) { + LOGGER.debug("Buffer flushing task failed for realm ({}): ", realmId, e); + } + } + } + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent) { + String realmId = callContext.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callContext.getPolarisCallContext().copy())); + if (buffer.get(realmId).size() >= maxBufferSize) { + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId, true)), 1); + } + } + + @VisibleForTesting + void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) { + ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + EventAndContext head = queue.peek(); + if (head == null) { + return; + } + + Duration elapsed = Duration.ofMillis(clock.millis() - head.polarisEvent.getTimestampMs()); + + if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || forceFlush) { + // Atomically replace old queue with new queue + boolean replaced = buffer.replace(realmId, queue, new ConcurrentLinkedQueue<>()); + if (!replaced) { + // Another thread concurrently modified the buffer, so do not continue + return; + } + + metaStoreManagerFactory Review Comment: I'm glad you raised this - I thought it would be more frowned to recreate the metastoreManager each time instead of just keeping a handle on the PolarisCallContext. But this is the cleaner solution, I agree! Thank you! ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + + // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer` + @Override + public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + @Override + public void onAfterTableCreated(AfterTableCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalProperties(additionalParameters); + addToBuffer(polarisEvent); + } + + @Override + public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent); + } + + protected record ContextSpecificInformation(long timestamp, String principalName) {} Review Comment: I did this to make the code more DRY - if there are more Context-Specific pieces of information, then we can add them here as well and keep the code tight. If you disagree, I'm okay with undoing this record. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Executor did not shut down cleanly"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, true); + } catch (Exception e) { + LOGGER.debug("Buffer flushing task failed for realm ({}): ", realmId, e); + } + } + } + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent) { + String realmId = callContext.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callContext.getPolarisCallContext().copy())); + if (buffer.get(realmId).size() >= maxBufferSize) { Review Comment: Good call! Changed. ########## runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java: ########## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.ws.rs.container.ContainerRequestContext; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisMetaStoreManager polarisMetaStoreManager; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = Duration.ofMillis(500); + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class); + when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); + when(polarisCallContext.copy()).thenReturn(polarisCallContext); + + MetaStoreManagerFactory metaStoreManagerFactory = Mockito.mock(MetaStoreManagerFactory.class); + polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); + when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any())) + .thenReturn(polarisMetaStoreManager); + + InMemoryBufferEventListenerConfiguration eventListenerConfiguration = + Mockito.mock(InMemoryBufferEventListenerConfiguration.class); + when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE); + when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS); + + clock = + MutableClock.of( + Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to make it easier to test + + eventListener = + new InMemoryBufferPolarisPersistenceEventListener( + metaStoreManagerFactory, clock, eventListenerConfiguration); + + // Use reflection to set the callContext field + try { + java.lang.reflect.Field field = + InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext"); + field.setAccessible(true); + field.set(eventListener, callContext); + } catch (Exception e) { + throw new RuntimeException("Failed to set callContext field", e); + } + } + + @Test + public void testAddToBufferFlushesAfterConfiguredTime() { + String realmId = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + eventListener.checkAndFlushBufferIfNecessary(realmId, false); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + } + + @Test + public void testAddToBufferFlushesAfterMaxEvents() { + String realm1 = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realm1); + List<PolarisEvent> eventsAddedToBufferRealm2 = addEventsWithoutTriggeringFlush("realm2"); + + // Add the last event for realm1 and verify that it did trigger the flush + PolarisEvent triggeringEvent = createSampleEvent(); + RealmContext realmContext = () -> realm1; + when(callContext.getRealmContext()).thenReturn(realmContext); + eventListener.addToBuffer(triggeringEvent); + eventsAddedToBuffer.add(triggeringEvent); + + // Calling checkAndFlushBufferIfNecessary manually to replicate the behavior of the executor + // service + eventListener.checkAndFlushBufferIfNecessary(realm1, false); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + verify(polarisMetaStoreManager, times(0)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBufferRealm2)); + } + + @Test + public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws Exception { + String realmId = "realm1"; + int threadCount = 10; + List<Thread> threads = new ArrayList<>(); + ConcurrentLinkedQueue<Exception> exceptions = new ConcurrentLinkedQueue<>(); + + // Pre-populate the buffer with events + List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + + // Each thread will call checkAndFlushBufferIfNecessary concurrently + for (int i = 0; i < threadCount; i++) { + Thread t = + new Thread( + () -> { + try { + eventListener.checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + exceptions.add(e); + } + }); + threads.add(t); + } + // Start all threads + threads.forEach(Thread::start); + // Wait for all threads to finish + for (Thread t : threads) { + t.join(); + } + // There should be no exceptions + if (!exceptions.isEmpty()) { + throw new AssertionError( + "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: ", exceptions.peek()); + } + // Only one flush should occur + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(events)); + } + + @Test + public void testRequestIdFunctionalityWithContainerRequestContext() { + // Test when containerRequestContext has requestId property + ContainerRequestContext mockContainerRequestContext = + Mockito.mock(ContainerRequestContext.class); + String expectedRequestId = "custom-request-id-123"; + + when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true); + when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId); + + // Use reflection to set the containerRequestContext field + try { + java.lang.reflect.Field field = + InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField( + "containerRequestContext"); + field.setAccessible(true); + field.set(eventListener, mockContainerRequestContext); + } catch (Exception e) { + throw new RuntimeException("Failed to set containerRequestContext field", e); + } Review Comment: Sorry, this is overhang from a previous revision where this was not possible to do. Thank you for pointing this out! Changed all references to similar patterns. ########## runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java: ########## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.ws.rs.container.ContainerRequestContext; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisMetaStoreManager polarisMetaStoreManager; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = Duration.ofMillis(500); + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class); + when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); + when(polarisCallContext.copy()).thenReturn(polarisCallContext); + + MetaStoreManagerFactory metaStoreManagerFactory = Mockito.mock(MetaStoreManagerFactory.class); + polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); + when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any())) + .thenReturn(polarisMetaStoreManager); + + InMemoryBufferEventListenerConfiguration eventListenerConfiguration = + Mockito.mock(InMemoryBufferEventListenerConfiguration.class); + when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE); + when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS); + + clock = + MutableClock.of( + Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to make it easier to test + + eventListener = + new InMemoryBufferPolarisPersistenceEventListener( + metaStoreManagerFactory, clock, eventListenerConfiguration); + + // Use reflection to set the callContext field + try { + java.lang.reflect.Field field = + InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext"); + field.setAccessible(true); + field.set(eventListener, callContext); + } catch (Exception e) { + throw new RuntimeException("Failed to set callContext field", e); + } + } + + @Test + public void testAddToBufferFlushesAfterConfiguredTime() { + String realmId = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + eventListener.checkAndFlushBufferIfNecessary(realmId, false); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + } + + @Test + public void testAddToBufferFlushesAfterMaxEvents() { + String realm1 = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realm1); + List<PolarisEvent> eventsAddedToBufferRealm2 = addEventsWithoutTriggeringFlush("realm2"); + + // Add the last event for realm1 and verify that it did trigger the flush + PolarisEvent triggeringEvent = createSampleEvent(); + RealmContext realmContext = () -> realm1; + when(callContext.getRealmContext()).thenReturn(realmContext); + eventListener.addToBuffer(triggeringEvent); + eventsAddedToBuffer.add(triggeringEvent); + + // Calling checkAndFlushBufferIfNecessary manually to replicate the behavior of the executor + // service + eventListener.checkAndFlushBufferIfNecessary(realm1, false); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + verify(polarisMetaStoreManager, times(0)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBufferRealm2)); + } + + @Test + public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws Exception { Review Comment: Added a new test in the revision that should address this. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); Review Comment: Great catch, changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
