pingtimeout commented on code in PR #1339: URL: https://github.com/apache/polaris/pull/1339#discussion_r2034662023
########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/IndexedCache.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Striped; +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedHashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.function.Function; +import java.util.function.Supplier; + +// Source: +// https://github.com/ben-manes/caffeine/blob/master/examples/indexable/src/main/java/com/github/benmanes/caffeine/examples/indexable/IndexedCache.java + +/** + * A cache abstraction that allows the entry to looked up by alternative keys. This approach mirrors + * a database table where a row is stored by its primary key, it contains all of the columns that + * identify it, and the unique indexes are additional mappings defined by the column mappings. This + * class similarly stores the in the value once in the cache by its primary key and maintains a + * secondary mapping for lookups by using indexing functions to derive the keys. + * + * @author [email protected] (Ben Manes) + */ +final class IndexedCache<K, V> { + private final ConcurrentMap<K, LinkedHashSet<K>> indexes; + private final LinkedHashSet<Function<V, K>> indexers; + private final Striped<Lock> locks; + private final Cache<K, V> store; + + private IndexedCache( + Caffeine<Object, Object> cacheBuilder, LinkedHashSet<Function<V, K>> indexers) { + this.indexes = new ConcurrentHashMap<>(); + this.locks = Striped.lock(1_024); + this.indexers = indexers; + this.store = + cacheBuilder + .evictionListener((key, value, cause) -> indexes.keySet().removeAll(indexes.get(key))) + .build(); + } + + /** Returns the value associated with the key or {@code null} if not found. */ + V getIfPresent(K key) { + var index = indexes.get(key); + return (index == null) ? null : store.getIfPresent(index.iterator().next()); + } + + /** + * Returns the value associated with the key, obtaining that value from the {@code loader} if + * necessary. The entire method invocation is performed atomically, so the function is applied at + * most once per key. As the value may be looked up by alternative keys, those function + * invocations may be executed in parallel and will replace any existing mappings when completed. + */ + V get(K key, Supplier<V> loader) { + var value = getIfPresent(key); + if (value != null) { + return value; + } + + var lock = locks.get(key); + lock.lock(); + try { + value = getIfPresent(key); + if (value != null) { + return value; + } + + value = loader.get(); + if (value == null) { + return null; + } + + put(value); + return value; + } finally { + lock.unlock(); + } + } + + /** Associates the {@code value} with its keys, replacing the old value and keys if present. */ + private void put(V value) { + putIfNewer(value, (oldValue, newValue) -> 1); + } + + /** + * Associates the {@code newValue} with its keys if it is newer than the existing value according + * to the provided {@code comparator}. If the new value is added, the old value and its associated + * keys are replaced. + * + * @param newValue the new value to be added to the cache + * @param comparator a comparator to determine if the new value is newer than the existing value. + * The comparator must support `null` values in case no value exist in the cache prior to + * invocation. + */ + private void putIfNewer(V newValue, Comparator<V> comparator) { + requireNonNull(newValue); + var index = buildIndex(newValue); + store + .asMap() + .compute( + index.iterator().next(), + (key, oldValue) -> { + if (comparator.compare(oldValue, newValue) > 0) { + if (oldValue != null) { + indexes + .keySet() + .removeAll(Sets.difference(indexes.get(index.iterator().next()), index)); + } + for (var indexKey : index) { + indexes.put(indexKey, index); + } + return newValue; + } else { + return oldValue; + } + }); + } + + /** Discards any cached value and its keys. */ + void invalidate(K key) { + var index = indexes.get(key); + if (index == null) { + return; + } + + store + .asMap() + .computeIfPresent( + index.iterator().next(), + (k, v) -> { + indexes.keySet().removeAll(indexes.get(key)); Review Comment: 👍 Thanks, fixing that ########## gradle/libs.versions.toml: ########## @@ -97,6 +97,7 @@ threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" } [plugins] jandex = { id = "org.kordamp.gradle.jandex", version = "2.1.0" } +jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" } Review Comment: There are three asks in this comment. **1 - Explicitly add jcstress-core as a library** Wouldn't this be over-engineering? AFAICT this is only in preparation of a hypothetical need to use a different jcstress version than the default one used by the gradle plugin. **2 - Add dependency between jcstress compilation tasks and jandex** I am not sure what these are for. The jcstress tests are working without those. Can you elaborate a bit? **3 - Run jcstress tests with the `check` phrase** I would be very careful about the implications of this ask. This can substantially increase the duration of the builds. As I wrote in another comment, stress tests are good for proving that issues exist and give good confidence that they are fixed. I don't think they should be executed against every commit. More about this: before submitting this PR, I ran the jcstress tests with the `tough` mode, which took 15 minutes at 100% CPU usage on all 16 cores of my machine. No issue was found. I think it is better to work that way (i.e. run long tests before submitting a PR) than to enforce expensive tests which will find nothing 99.9% of the time. ########## jcstress-tests/src/jcstress/java/org/apache/polaris/core/persistence/cache/EntityCacheCoherenceTest.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static org.apache.polaris.core.entity.PolarisEntityType.CATALOG; +import static org.apache.polaris.core.persistence.cache.FakeMetaStoreManager.CATALOG_ID; +import static org.openjdk.jcstress.annotations.Expect.*; + +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.LL_Result; + +public class EntityCacheCoherenceTest { + /** The cache key associated with the catalog that is returned by the fake meta store manager. */ + private static final EntityCacheByNameKey KEY = new EntityCacheByNameKey(CATALOG, "test"); + + @JCStressTest + @Description( + "Tests that the two caches inside EntityCache are coherent with one another. In this test, two" + + " actors are calling getOrLoadById and getOrLoadByName. The entities received by the" + + " actors are not checked as part of this test. Instead, an arbiter runs after the" + + " actors have performed their calls and checks the version of the entity that are in" + + " each of the two caches using the getter methods getEntityById and getEntityByName. " + + " Expected behaviour is that the two caches return the same entity version. Any" + + " discrepancy indicates that the caches are out of sync.") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "The caches are in sync"), + @Outcome(id = "2, 2", expect = ACCEPTABLE, desc = "The caches are in sync"), + @Outcome(id = "2, 1", expect = FORBIDDEN, desc = "The caches are out of sync"), Review Comment: Let me add that outcome to be comprehensive. Really, `1, 2` is just another "The caches are out of sync" case and is already caught by the `expect = UNKNOWN` outcome. Any unknown outcome results in the JCStress test to fail. But it is better to be thorough 👍. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -18,185 +18,74 @@ */ package org.apache.polaris.core.persistence.cache; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.util.concurrent.Striped; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.AbstractMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; /** The entity cache, can be private or shared */ public class EntityCache { + private static final Comparator<ResolvedPolarisEntity> RESOLVED_POLARIS_ENTITY_COMPARATOR = Review Comment: It is not... It is a leftover from a tentative rewrite of `getAndRefreshIfNeeded`. Removing... ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -255,104 +150,74 @@ public void setCacheMode(EntityCacheMode cacheMode) { @Nonnull PolarisBaseEntity entityToValidate, int entityMinVersion, int entityGrantRecordsMinVersion) { - long entityCatalogId = entityToValidate.getCatalogId(); long entityId = entityToValidate.getId(); - PolarisEntityType entityType = entityToValidate.getType(); - // first lookup the cache to find the existing cache entry + // First lookup the cache to find the existing cache entry ResolvedPolarisEntity existingCacheEntry = this.getEntityById(entityId); - // the caller's fetched entity may have come from a stale lookup byName; we should consider - // the existingCacheEntry to be the older of the two for purposes of invalidation to make - // sure when we replaceCacheEntry we're also removing the old name if it's no longer valid - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(entityToValidate); - ResolvedPolarisEntity existingCacheEntryByName = this.getEntityByName(nameKey); - if (existingCacheEntryByName != null - && existingCacheEntry != null - && isNewer(existingCacheEntry, existingCacheEntryByName)) { - existingCacheEntry = existingCacheEntryByName; + // See if we need to load or refresh that entity + if (existingCacheEntry != null + && existingCacheEntry.getEntity().getEntityVersion() >= entityMinVersion + && existingCacheEntry.getEntity().getGrantRecordsVersion() + >= entityGrantRecordsMinVersion) { + // Cache hit and cache entry is up to date, nothing to do. + return existingCacheEntry; } - // the new one to be returned - final ResolvedPolarisEntity newCacheEntry; - - // see if we need to load or refresh that entity - if (existingCacheEntry == null - || existingCacheEntry.getEntity().getEntityVersion() < entityMinVersion - || existingCacheEntry.getEntity().getGrantRecordsVersion() < entityGrantRecordsMinVersion) { + // Either cache miss, dropped entity or stale entity. In either case, invalidate and reload it. + // Do the refresh in a critical section based on the entity ID to avoid race conditions. + Lock lock = this.locks.get(entityId); + try { + lock.lock(); + + // Lookup the cache again in case another thread has already invalidated it. + existingCacheEntry = this.getEntityById(entityId); + + // See if the entity has been refreshed concurrently + if (existingCacheEntry != null + && existingCacheEntry.getEntity().getEntityVersion() >= entityMinVersion + && existingCacheEntry.getEntity().getGrantRecordsVersion() + >= entityGrantRecordsMinVersion) { + // Cache hit and cache entry is up to date now, exit + return existingCacheEntry; + } - // the refreshed entity - final ResolvedEntityResult refreshedCacheEntry; + // We are the first to act upon this entity id, invalidate it + this.cache.invalidate(new IdKey(entityId)); - // was not found in the cache? - final PolarisBaseEntity entity; - final List<PolarisGrantRecord> grantRecords; - final int grantRecordsVersion; - if (existingCacheEntry == null) { - // try to load it - refreshedCacheEntry = - this.polarisMetaStoreManager.loadResolvedEntityById( - callContext, entityCatalogId, entityId, entityType); - if (refreshedCacheEntry.isSuccess()) { - entity = refreshedCacheEntry.getEntity(); - grantRecords = refreshedCacheEntry.getEntityGrantRecords(); - grantRecordsVersion = refreshedCacheEntry.getGrantRecordsVersion(); - } else { - return null; - } - } else { - // refresh it - refreshedCacheEntry = - this.polarisMetaStoreManager.refreshResolvedEntity( - callContext, - existingCacheEntry.getEntity().getEntityVersion(), - existingCacheEntry.getEntity().getGrantRecordsVersion(), - entityType, - entityCatalogId, - entityId); - if (refreshedCacheEntry.isSuccess()) { - entity = - (refreshedCacheEntry.getEntity() != null) - ? refreshedCacheEntry.getEntity() - : existingCacheEntry.getEntity(); - if (refreshedCacheEntry.getEntityGrantRecords() != null) { - grantRecords = refreshedCacheEntry.getEntityGrantRecords(); - grantRecordsVersion = refreshedCacheEntry.getGrantRecordsVersion(); - } else { - grantRecords = existingCacheEntry.getAllGrantRecords(); - grantRecordsVersion = existingCacheEntry.getEntity().getGrantRecordsVersion(); - } - } else { - // entity has been purged, remove it - this.removeCacheEntry(existingCacheEntry); - return null; - } + // Get the entity from the cache or reload it now that it has been invalidated + EntityCacheLookupResult cacheLookupResult = + this.getOrLoadEntityById( Review Comment: The goal of the EntityCache is not to be up to date 100% of the time. The `getAndRefreshIfNeeded` method is the only one that provides means to require an entity to be _sufficiently up to date_. The lock protects the `read-then-write` aspect of the operations (1) get the entity, (2) reload it if it is stale. See my response to Robert a couple of lines above this one for more. To be clear, it is possible that there are still unidentified race conditions in the code. The fact that the cache has two different ways of accessing the same entity is a very tricky design. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -18,185 +18,74 @@ */ package org.apache.polaris.core.persistence.cache; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.util.concurrent.Striped; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.AbstractMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; /** The entity cache, can be private or shared */ public class EntityCache { + private static final Comparator<ResolvedPolarisEntity> RESOLVED_POLARIS_ENTITY_COMPARATOR = + Comparator.nullsLast( + Comparator.<ResolvedPolarisEntity>comparingInt(rpe -> rpe.getEntity().getEntityVersion()) + .thenComparingInt(rpe -> rpe.getEntity().getGrantRecordsVersion())); + private static final int STRIPES = 1_024; // cache mode private EntityCacheMode cacheMode; // the meta store manager private final PolarisMetaStoreManager polarisMetaStoreManager; - // Caffeine cache to keep entries by id - private final Cache<Long, ResolvedPolarisEntity> byId; + // Caffeine cache to keep entries + private final IndexedCache<CacheKey, ResolvedPolarisEntity> cache; - // index by name - private final AbstractMap<EntityCacheByNameKey, ResolvedPolarisEntity> byName; + // Locks to ensure that an entity can only be refreshed by one thread at a time + private final Striped<Lock> locks; /** * Constructor. Cache can be private or shared * * @param polarisMetaStoreManager the meta store manager implementation */ public EntityCache(@Nonnull PolarisMetaStoreManager polarisMetaStoreManager) { - - // by name cache - this.byName = new ConcurrentHashMap<>(); - - // When an entry is removed, we simply remove it from the byName map - RemovalListener<Long, ResolvedPolarisEntity> removalListener = - (key, value, cause) -> { - if (value != null) { - // compute name key - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(value.getEntity()); - - // if it is still active, remove it from the name key - this.byName.remove(nameKey, value); - } - }; - - // use a Caffeine cache to purge entries when those have not been used for a long time. - // Assuming 1KB per entry, 100K entries is about 100MB. - this.byId = - Caffeine.newBuilder() - .maximumSize(100_000) // Set maximum size to 100,000 elements - .expireAfterAccess(1, TimeUnit.HOURS) // Expire entries after 1 hour of no access - .removalListener(removalListener) // Set the removal listener - .build(); - // remember the meta store manager this.polarisMetaStoreManager = polarisMetaStoreManager; // enabled by default this.cacheMode = EntityCacheMode.ENABLE; + + // Use a Caffeine cache to purge entries when those have not been used for a long time. + // Assuming 1KB per entry, 100K entries is about 100MB. Note that each entity is stored twice in + // the cache, once indexed by its identifier and once indexed by its name. + this.cache = + new IndexedCache.Builder<CacheKey, ResolvedPolarisEntity>() + .primaryKey(e -> new IdKey(e.getEntity().getId())) + .addSecondaryKey(e -> new NameKey(new EntityCacheByNameKey(e.getEntity()))) + .expireAfterWrite(Duration.ofMinutes(5)) + .maximumSize(10_000) Review Comment: Good point. I did not look closely enough. I will add `expireAfterAccess` to the IndexedCache Builder class and leverage it. ########## jcstress-tests/src/jcstress/java/org/apache/polaris/core/persistence/cache/FakeMetaStoreManager.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static java.util.Collections.emptyList; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.*; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.dao.entity.*; +import org.apache.polaris.core.storage.PolarisStorageActions; + +/** + * Build a fake meta store manager that returns a single catalog entity. Every time the catalog is + * returned, an incremented entity version id is used. this is in order to disambiguate the entity + * returned by the cache and verify whether the cache is thread safe. + * + * <p>Think of it as a poor-man's Mockito. It is not a real mock, but it is good enough for this + * test. More importantly, it is a lot faster to instantiate than Mockito. It allows the stress test + * to run up to 100x more iterations in the same amount of time. + * + * <p>The only implemented methods are `loadResolvedEntityById` and `loadResolvedEntityByName`. Any + * time they are invoked, regardless of their parameters, the same catalog is returned but with an + * increased entity version id. + */ +public class FakeMetaStoreManager implements PolarisMetaStoreManager { + public static final int CATALOG_ID = 42; + private final Supplier<ResolvedEntityResult> catalogSupplier; + + public FakeMetaStoreManager() { + final AtomicInteger versionCounter = new AtomicInteger(1); + this.catalogSupplier = + () -> { + int version = versionCounter.getAndIncrement(); + CatalogEntity catalog = + new CatalogEntity.Builder() + .setId(CATALOG_ID) + .setInternalProperties(Map.of()) + .setProperties(Map.of()) + .setName("test") + .setParentId(PolarisEntityConstants.getRootEntityId()) + .setEntityVersion(version) + .build(); + return new ResolvedEntityResult(catalog, version, emptyList()); + }; + } + + /** + * Utility method that ensures that catalogs creation is thread safe. + * + * @return a catalog entity with an incremented version id + */ + private synchronized ResolvedEntityResult nextResult() { Review Comment: With the previous implementation, it was possible that multiple calls to the `PolarisMetaStoreManager` were issued simultaneously for the entity. So synchronization was necessary to ensure that the returned entities were themselves correct with total ordering. Now, with the IndexedCache based implementation, I think that this is not needed anymore. I believe that the cache first gets an internal lock for the key it is about to load **before** loading the associated entity from the `PolarisMetaStoreManager`. So a critical section might be superfluous. But I think it is good to keep it in case anybody wants to run additional tests against the old `EntityCache`. ########## jcstress-tests/src/jcstress/java/org/apache/polaris/core/persistence/cache/EntityCacheGetAndRefreshIfNeededTest.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static org.openjdk.jcstress.annotations.Expect.*; + +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.II_Result; + +@JCStressTest +@Description( + "Tests getAndRefreshIfNeeded is thread-safe. In this test, two actors are calling" + + " getAndRefreshIfNeeded twice on the same key. Each actor returns the version of the" + + " entity returned by the two calls to getAndRefreshIfNeeded. The method is invoked using" + + " a minimum entity version equal to 2. Expected behaviour is that the two actors get" + + " the same object twice, or for an object to be updated to a newer version between" + + " reads. But the cache should never go backward and server a stale version after a newer" Review Comment: oops ########## bom/build.gradle.kts: ########## @@ -50,5 +50,6 @@ dependencies { api(project(":polaris-quarkus-spark-tests")) api(project(":polaris-tests")) + api(project(":polaris-jcstress-tests")) Review Comment: It could be a valid option too. It would colocate the stress tests with the code that they stress. I did not choose that approach for a couple of reasons: 1. There is an `integration-tests` module under the root directory, and stress tests are just another form of testing that is very different from unit testing. 2. Stress tests should not be executed in every build. They should be used to prove out that issues exists, and give reasonable confidence that issues are fixed. So they have a different lifecycle. 3. As I was not sure they would be accepted in the project, because of jcstress GPL, I did not want to have invasive code modifications to existing modules If you really want the stress tests to be colocated with their associated module, I can do that. ########## jcstress-tests/src/jcstress/java/org/apache/polaris/core/persistence/cache/EntityCacheCoherenceTest.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static org.apache.polaris.core.entity.PolarisEntityType.CATALOG; +import static org.apache.polaris.core.persistence.cache.FakeMetaStoreManager.CATALOG_ID; +import static org.openjdk.jcstress.annotations.Expect.*; + +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.LL_Result; + +public class EntityCacheCoherenceTest { + /** The cache key associated with the catalog that is returned by the fake meta store manager. */ + private static final EntityCacheByNameKey KEY = new EntityCacheByNameKey(CATALOG, "test"); + + @JCStressTest + @Description( + "Tests that the two caches inside EntityCache are coherent with one another. In this test, two" + + " actors are calling getOrLoadById and getOrLoadByName. The entities received by the" + + " actors are not checked as part of this test. Instead, an arbiter runs after the" + + " actors have performed their calls and checks the version of the entity that are in" + + " each of the two caches using the getter methods getEntityById and getEntityByName. " + + " Expected behaviour is that the two caches return the same entity version. Any" + + " discrepancy indicates that the caches are out of sync.") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "The caches are in sync"), + @Outcome(id = "2, 2", expect = ACCEPTABLE, desc = "The caches are in sync"), + @Outcome(id = "2, 1", expect = FORBIDDEN, desc = "The caches are out of sync"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class UsingGetters { + private final PolarisCallContext context; + private final EntityCache entityCache; + + public UsingGetters() { + context = new PolarisCallContext(new FakeBasePersistence(), new FakePolarisDiagnostics()); + entityCache = new EntityCache(new FakeMetaStoreManager()); + } + + @Actor + public void actor1() { + entityCache + .getOrLoadEntityById(context, 0L, CATALOG_ID, CATALOG) + .getCacheEntry() + .getEntity() + .getEntityVersion(); + entityCache + .getOrLoadEntityByName(context, KEY) + .getCacheEntry() + .getEntity() + .getEntityVersion(); + } + + @Actor + public void actor2() { + entityCache + .getOrLoadEntityById(context, 0L, CATALOG_ID, CATALOG) + .getCacheEntry() + .getEntity() + .getEntityVersion(); + entityCache + .getOrLoadEntityByName(context, KEY) Review Comment: For the old EntityCache implementation, calling the getters in the reverse order was actuall _nicer_ to the code than calling them in the same order. So I went for the hardest code path upfront. For the new implementation, it does not really matter. No matter which getter is executed first (by name or by ID), once the entity is fetched from the underlying database, the cache will be populated by ID first. And a lock based on the ID will be used to protect the operation. So both orders resolve to the same thing. Do you want me to add an additional case here? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -18,185 +18,74 @@ */ package org.apache.polaris.core.persistence.cache; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.util.concurrent.Striped; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.AbstractMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; /** The entity cache, can be private or shared */ public class EntityCache { + private static final Comparator<ResolvedPolarisEntity> RESOLVED_POLARIS_ENTITY_COMPARATOR = + Comparator.nullsLast( + Comparator.<ResolvedPolarisEntity>comparingInt(rpe -> rpe.getEntity().getEntityVersion()) + .thenComparingInt(rpe -> rpe.getEntity().getGrantRecordsVersion())); Review Comment: It is not... It is a leftover from a tentative rewrite of `getAndRefreshIfNeeded`. Removing... ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -365,50 +230,46 @@ && isNewer(existingCacheEntry, existingCacheEntryByName)) { long entityCatalogId, long entityId, PolarisEntityType entityType) { - - // if it exists, we are set - ResolvedPolarisEntity entry = this.getEntityById(entityId); - final boolean cacheHit; - - // we need to load it if it does not exist - if (entry == null) { - // this is a miss - cacheHit = false; - - // load it - ResolvedEntityResult result = - polarisMetaStoreManager.loadResolvedEntityById( - callContext, entityCatalogId, entityId, entityType); - - // not found, exit - if (!result.isSuccess()) { - return null; - } - - // if found, setup entry - callContext.getDiagServices().checkNotNull(result.getEntity(), "entity_should_loaded"); - callContext - .getDiagServices() - .checkNotNull(result.getEntityGrantRecords(), "entity_grant_records_should_loaded"); - entry = - new ResolvedPolarisEntity( - callContext.getDiagServices(), - result.getEntity(), - result.getEntityGrantRecords(), - result.getGrantRecordsVersion()); - - // the above loading could take a long time so check again if the entry exists and only - this.cacheNewEntry(entry); + final AtomicBoolean cacheHit = new AtomicBoolean(true); + + ResolvedPolarisEntity entity = + this.cache.get( + new IdKey(entityId), + () -> { + cacheHit.set(false); Review Comment: It can race with `getAndRefreshIfNeeded`, indeed. But those races only result in * the cache being populated earlier, if `getOrLoadEntityById` goes first * the cache being populated twice, if `getOrLoadEntityById` goes first while the entity was updated by another process * the cache being populated with a fresh-enough entity if `getAndRefreshIfNeeded` goes first It would be possible to remove all race conditions by using a lock over the entire cache. Typically, a `ReentrantReadWriteLock`. But depending on the cache update rate, this could have serious performance implications. So, back to the more fundamental question. And really this is all about the guarantees that `EntityCache` currently provides. The `EntityCache` does not have any guarantee about the staleness of entities returned by `getOrLoadEntityBy...`. The only guarantees it provides now are that if an entry exists in the database, it can be fetched using its id or name. At no time should those methods return `null` if an entity exists. However, it guarantees that entities returned by `getAndRefreshIfNeeded` will be invalidated and reloaded from the underlying database if the requested version is higher than the currently cached version. And that is the behaviour that is allowed by the current code. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -18,185 +18,74 @@ */ package org.apache.polaris.core.persistence.cache; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.util.concurrent.Striped; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.AbstractMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; /** The entity cache, can be private or shared */ public class EntityCache { + private static final Comparator<ResolvedPolarisEntity> RESOLVED_POLARIS_ENTITY_COMPARATOR = + Comparator.nullsLast( + Comparator.<ResolvedPolarisEntity>comparingInt(rpe -> rpe.getEntity().getEntityVersion()) + .thenComparingInt(rpe -> rpe.getEntity().getGrantRecordsVersion())); + private static final int STRIPES = 1_024; // cache mode private EntityCacheMode cacheMode; // the meta store manager private final PolarisMetaStoreManager polarisMetaStoreManager; - // Caffeine cache to keep entries by id - private final Cache<Long, ResolvedPolarisEntity> byId; + // Caffeine cache to keep entries + private final IndexedCache<CacheKey, ResolvedPolarisEntity> cache; - // index by name - private final AbstractMap<EntityCacheByNameKey, ResolvedPolarisEntity> byName; + // Locks to ensure that an entity can only be refreshed by one thread at a time + private final Striped<Lock> locks; /** * Constructor. Cache can be private or shared * * @param polarisMetaStoreManager the meta store manager implementation */ public EntityCache(@Nonnull PolarisMetaStoreManager polarisMetaStoreManager) { - - // by name cache - this.byName = new ConcurrentHashMap<>(); - - // When an entry is removed, we simply remove it from the byName map - RemovalListener<Long, ResolvedPolarisEntity> removalListener = - (key, value, cause) -> { - if (value != null) { - // compute name key - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(value.getEntity()); - - // if it is still active, remove it from the name key - this.byName.remove(nameKey, value); - } - }; - - // use a Caffeine cache to purge entries when those have not been used for a long time. - // Assuming 1KB per entry, 100K entries is about 100MB. - this.byId = - Caffeine.newBuilder() - .maximumSize(100_000) // Set maximum size to 100,000 elements - .expireAfterAccess(1, TimeUnit.HOURS) // Expire entries after 1 hour of no access - .removalListener(removalListener) // Set the removal listener - .build(); - // remember the meta store manager this.polarisMetaStoreManager = polarisMetaStoreManager; // enabled by default this.cacheMode = EntityCacheMode.ENABLE; + + // Use a Caffeine cache to purge entries when those have not been used for a long time. + // Assuming 1KB per entry, 100K entries is about 100MB. Note that each entity is stored twice in Review Comment: Done ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -365,50 +230,46 @@ && isNewer(existingCacheEntry, existingCacheEntryByName)) { long entityCatalogId, long entityId, PolarisEntityType entityType) { - - // if it exists, we are set - ResolvedPolarisEntity entry = this.getEntityById(entityId); - final boolean cacheHit; - - // we need to load it if it does not exist - if (entry == null) { - // this is a miss - cacheHit = false; - - // load it - ResolvedEntityResult result = - polarisMetaStoreManager.loadResolvedEntityById( - callContext, entityCatalogId, entityId, entityType); - - // not found, exit - if (!result.isSuccess()) { - return null; - } - - // if found, setup entry - callContext.getDiagServices().checkNotNull(result.getEntity(), "entity_should_loaded"); - callContext - .getDiagServices() - .checkNotNull(result.getEntityGrantRecords(), "entity_grant_records_should_loaded"); - entry = - new ResolvedPolarisEntity( - callContext.getDiagServices(), - result.getEntity(), - result.getEntityGrantRecords(), - result.getGrantRecordsVersion()); - - // the above loading could take a long time so check again if the entry exists and only - this.cacheNewEntry(entry); + final AtomicBoolean cacheHit = new AtomicBoolean(true); Review Comment: Is there strong preference for `var` in the Polaris codebase? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -18,185 +18,74 @@ */ package org.apache.polaris.core.persistence.cache; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.util.concurrent.Striped; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.AbstractMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; /** The entity cache, can be private or shared */ public class EntityCache { + private static final Comparator<ResolvedPolarisEntity> RESOLVED_POLARIS_ENTITY_COMPARATOR = + Comparator.nullsLast( + Comparator.<ResolvedPolarisEntity>comparingInt(rpe -> rpe.getEntity().getEntityVersion()) + .thenComparingInt(rpe -> rpe.getEntity().getGrantRecordsVersion())); + private static final int STRIPES = 1_024; // cache mode private EntityCacheMode cacheMode; // the meta store manager private final PolarisMetaStoreManager polarisMetaStoreManager; - // Caffeine cache to keep entries by id - private final Cache<Long, ResolvedPolarisEntity> byId; + // Caffeine cache to keep entries + private final IndexedCache<CacheKey, ResolvedPolarisEntity> cache; - // index by name - private final AbstractMap<EntityCacheByNameKey, ResolvedPolarisEntity> byName; + // Locks to ensure that an entity can only be refreshed by one thread at a time + private final Striped<Lock> locks; /** * Constructor. Cache can be private or shared * * @param polarisMetaStoreManager the meta store manager implementation */ public EntityCache(@Nonnull PolarisMetaStoreManager polarisMetaStoreManager) { - - // by name cache - this.byName = new ConcurrentHashMap<>(); - - // When an entry is removed, we simply remove it from the byName map - RemovalListener<Long, ResolvedPolarisEntity> removalListener = - (key, value, cause) -> { - if (value != null) { - // compute name key - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(value.getEntity()); - - // if it is still active, remove it from the name key - this.byName.remove(nameKey, value); - } - }; - - // use a Caffeine cache to purge entries when those have not been used for a long time. - // Assuming 1KB per entry, 100K entries is about 100MB. - this.byId = - Caffeine.newBuilder() - .maximumSize(100_000) // Set maximum size to 100,000 elements - .expireAfterAccess(1, TimeUnit.HOURS) // Expire entries after 1 hour of no access - .removalListener(removalListener) // Set the removal listener - .build(); - // remember the meta store manager this.polarisMetaStoreManager = polarisMetaStoreManager; // enabled by default this.cacheMode = EntityCacheMode.ENABLE; + + // Use a Caffeine cache to purge entries when those have not been used for a long time. + // Assuming 1KB per entry, 100K entries is about 100MB. Note that each entity is stored twice in + // the cache, once indexed by its identifier and once indexed by its name. + this.cache = + new IndexedCache.Builder<CacheKey, ResolvedPolarisEntity>() + .primaryKey(e -> new IdKey(e.getEntity().getId())) + .addSecondaryKey(e -> new NameKey(new EntityCacheByNameKey(e.getEntity()))) + .expireAfterWrite(Duration.ofMinutes(5)) + .maximumSize(10_000) + .build(); + this.locks = Striped.lock(STRIPES); } /** - * Remove the specified cache entry from the cache + * Remove the specified cache entry from the cache. This applies both to the cache entry indexed + * by its id and by its name. * * @param cacheEntry cache entry to remove */ public void removeCacheEntry(@Nonnull ResolvedPolarisEntity cacheEntry) { - // compute name key - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(cacheEntry.getEntity()); - - // remove this old entry, this will immediately remove the named entry - this.byId.asMap().remove(cacheEntry.getEntity().getId(), cacheEntry); - - // remove it from the name key - this.byName.remove(nameKey, cacheEntry); - } - - /** - * Cache new entry - * - * @param cacheEntry new cache entry - */ - private void cacheNewEntry(@Nonnull ResolvedPolarisEntity cacheEntry) { - - // compute name key - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(cacheEntry.getEntity()); - - // get old value if one exist - ResolvedPolarisEntity oldCacheEntry = this.byId.getIfPresent(cacheEntry.getEntity().getId()); - - // put new entry, only if really newer one - this.byId - .asMap() - .merge( - cacheEntry.getEntity().getId(), - cacheEntry, - (oldValue, newValue) -> this.isNewer(newValue, oldValue) ? newValue : oldValue); - - // only update the name key if this entity was not dropped - if (!cacheEntry.getEntity().isDropped()) { - // here we don't really care about concurrent update to the key. Basically if we are - // pointing to the wrong entry, we will detect this and fix the issue - this.byName.put(nameKey, cacheEntry); - } - - // remove old name if it has changed - if (oldCacheEntry != null) { - // old name - EntityCacheByNameKey oldNameKey = new EntityCacheByNameKey(oldCacheEntry.getEntity()); - if (!oldNameKey.equals(nameKey)) { - this.byName.remove(oldNameKey, oldCacheEntry); - } - } - } - - /** - * Determine if the newer value is really newer - * - * @param newValue new cache entry - * @param oldValue old cache entry - * @return true if the newer cache entry - */ - private boolean isNewer(ResolvedPolarisEntity newValue, ResolvedPolarisEntity oldValue) { - return (newValue.getEntity().getEntityVersion() > oldValue.getEntity().getEntityVersion() - || newValue.getEntity().getGrantRecordsVersion() - > oldValue.getEntity().getGrantRecordsVersion()); - } - - /** - * Replace an old entry with a new one - * - * @param oldCacheEntry old entry - * @param newCacheEntry new entry - */ - private void replaceCacheEntry( - @Nullable ResolvedPolarisEntity oldCacheEntry, @Nonnull ResolvedPolarisEntity newCacheEntry) { - - // need to remove old? - if (oldCacheEntry != null) { - // only replace if there is a difference - if (this.entityNameKeyMismatch(oldCacheEntry.getEntity(), newCacheEntry.getEntity()) - || oldCacheEntry.getEntity().getEntityVersion() - < newCacheEntry.getEntity().getEntityVersion() - || oldCacheEntry.getEntity().getGrantRecordsVersion() - < newCacheEntry.getEntity().getGrantRecordsVersion()) { - // write new one - this.cacheNewEntry(newCacheEntry); - - // delete the old one assuming it has not been replaced by the above new entry - this.removeCacheEntry(oldCacheEntry); - } - } else { - // write new one - this.cacheNewEntry(newCacheEntry); - } - } - - /** - * Check if two entities have different cache keys (either by id or by name) - * - * @param entity the entity - * @param otherEntity the other entity - * @return true if there is a mismatch - */ - private boolean entityNameKeyMismatch( - @Nonnull PolarisBaseEntity entity, @Nonnull PolarisBaseEntity otherEntity) { - return entity.getId() != otherEntity.getId() - || entity.getParentId() != otherEntity.getParentId() - || !entity.getName().equals(otherEntity.getName()) - || entity.getTypeCode() != otherEntity.getTypeCode(); + IdKey key = new IdKey(cacheEntry.getEntity().getId()); + this.cache.invalidate(key); Review Comment: I am not sure about this one. The `locks` field is used to protect the read-then-write section in `getAndRefreshIfNeeded()`. All the other accesses to the cache are done using the entity id and are thread-safe. Am I missing something? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java: ########## @@ -255,104 +150,74 @@ public void setCacheMode(EntityCacheMode cacheMode) { @Nonnull PolarisBaseEntity entityToValidate, int entityMinVersion, int entityGrantRecordsMinVersion) { - long entityCatalogId = entityToValidate.getCatalogId(); long entityId = entityToValidate.getId(); - PolarisEntityType entityType = entityToValidate.getType(); - // first lookup the cache to find the existing cache entry + // First lookup the cache to find the existing cache entry ResolvedPolarisEntity existingCacheEntry = this.getEntityById(entityId); - // the caller's fetched entity may have come from a stale lookup byName; we should consider - // the existingCacheEntry to be the older of the two for purposes of invalidation to make - // sure when we replaceCacheEntry we're also removing the old name if it's no longer valid - EntityCacheByNameKey nameKey = new EntityCacheByNameKey(entityToValidate); - ResolvedPolarisEntity existingCacheEntryByName = this.getEntityByName(nameKey); - if (existingCacheEntryByName != null - && existingCacheEntry != null - && isNewer(existingCacheEntry, existingCacheEntryByName)) { - existingCacheEntry = existingCacheEntryByName; + // See if we need to load or refresh that entity + if (existingCacheEntry != null + && existingCacheEntry.getEntity().getEntityVersion() >= entityMinVersion + && existingCacheEntry.getEntity().getGrantRecordsVersion() + >= entityGrantRecordsMinVersion) { + // Cache hit and cache entry is up to date, nothing to do. + return existingCacheEntry; } - // the new one to be returned - final ResolvedPolarisEntity newCacheEntry; - - // see if we need to load or refresh that entity - if (existingCacheEntry == null - || existingCacheEntry.getEntity().getEntityVersion() < entityMinVersion - || existingCacheEntry.getEntity().getGrantRecordsVersion() < entityGrantRecordsMinVersion) { + // Either cache miss, dropped entity or stale entity. In either case, invalidate and reload it. + // Do the refresh in a critical section based on the entity ID to avoid race conditions. + Lock lock = this.locks.get(entityId); + try { + lock.lock(); + + // Lookup the cache again in case another thread has already invalidated it. + existingCacheEntry = this.getEntityById(entityId); + + // See if the entity has been refreshed concurrently + if (existingCacheEntry != null + && existingCacheEntry.getEntity().getEntityVersion() >= entityMinVersion + && existingCacheEntry.getEntity().getGrantRecordsVersion() + >= entityGrantRecordsMinVersion) { + // Cache hit and cache entry is up to date now, exit + return existingCacheEntry; + } Review Comment: Yes, this block makes sense, but the "why" is far from trivial. See this jcstress report without the duplicated `getEntityById() ... if refreshed, return` block you commented on.  The `3, 4` and `3, 5` cases are not that bad. They are just unnecessary calls to the database and should be marked as `Interesting`, really. But the `3, 2` case is definitely wrong. This race condition exists because of a combination of factors: 1. The `IndexedCache` instance can be accessed outside of the lock (by design) 2. Even though its two underlying data structures (Caffeine cache and `ConcurrentHashMap`) *are* thread safe, the combination of both is not. Thus it is possible to observe an updated `indexes` CHM before the cache itself has been updated. The `locks` are there to protect the read-then-write requirement of `getAndRefreshIfNeeded`. It is the only place where such a pattern happens. That pattern is very tricky to handle properly (and should probably not exist, if you ask me). And the only way I could find to support it is to have a double-checked locking idiom. An alternative could be to use much more coarse-grained locks but it would degrade performance by a lot. The `Indexed` cache could also expose its `locks` using methods `lock(id)` and `unlock(id)`, but then the caller would have to know exactly how to use it, at the risk of causing a full deadlock. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/IndexedCache.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; Review Comment: I am kind of -1 for moving it to a different package as it would mean having to make the cache and the cache key objects public. I guess it depends on the development practices of the project though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
