snazy commented on code in PR #1339:
URL: https://github.com/apache/polaris/pull/1339#discussion_r2033787597
##########
gradle/libs.versions.toml:
##########
@@ -97,6 +97,7 @@ threeten-extra = { module = "org.threeten:threeten-extra",
version = "1.8.0" }
[plugins]
jandex = { id = "org.kordamp.gradle.jandex", version = "2.1.0" }
+jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" }
Review Comment:
You probably also want in `[libraries]`
```
jcstress-core = { module = "org.openjdk.jcstress:jcstress-core", version =
"0.16" }
```
+
```
tasks.named("jcstressJar") { dependsOn("jandex") }
tasks.named("compileJcstressJava") { dependsOn("jandex") }
tasks.named("check") { dependsOn("jcstress") }
jcstress { jcstressDependency = libs.jcstress.core.get().toString() }
```
where jcstress is being used
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/IndexedCache.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.cache;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedHashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+// Source:
+//
https://github.com/ben-manes/caffeine/blob/master/examples/indexable/src/main/java/com/github/benmanes/caffeine/examples/indexable/IndexedCache.java
Review Comment:
Need the "magic" `CODE_COPIED_TO_POLARIS` in this file + a mention in
`/LICENSE` for this file and the source project.
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -365,50 +230,46 @@ && isNewer(existingCacheEntry, existingCacheEntryByName))
{
long entityCatalogId,
long entityId,
PolarisEntityType entityType) {
-
- // if it exists, we are set
- ResolvedPolarisEntity entry = this.getEntityById(entityId);
- final boolean cacheHit;
-
- // we need to load it if it does not exist
- if (entry == null) {
- // this is a miss
- cacheHit = false;
-
- // load it
- ResolvedEntityResult result =
- polarisMetaStoreManager.loadResolvedEntityById(
- callContext, entityCatalogId, entityId, entityType);
-
- // not found, exit
- if (!result.isSuccess()) {
- return null;
- }
-
- // if found, setup entry
- callContext.getDiagServices().checkNotNull(result.getEntity(),
"entity_should_loaded");
- callContext
- .getDiagServices()
- .checkNotNull(result.getEntityGrantRecords(),
"entity_grant_records_should_loaded");
- entry =
- new ResolvedPolarisEntity(
- callContext.getDiagServices(),
- result.getEntity(),
- result.getEntityGrantRecords(),
- result.getGrantRecordsVersion());
-
- // the above loading could take a long time so check again if the entry
exists and only
- this.cacheNewEntry(entry);
+ final AtomicBoolean cacheHit = new AtomicBoolean(true);
Review Comment:
Nit: `final` is syntactic sugar.
Could just be
```suggestion
var cacheHit = new AtomicBoolean(true);
```
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -365,50 +230,46 @@ && isNewer(existingCacheEntry, existingCacheEntryByName))
{
long entityCatalogId,
long entityId,
PolarisEntityType entityType) {
-
- // if it exists, we are set
- ResolvedPolarisEntity entry = this.getEntityById(entityId);
- final boolean cacheHit;
-
- // we need to load it if it does not exist
- if (entry == null) {
- // this is a miss
- cacheHit = false;
-
- // load it
- ResolvedEntityResult result =
- polarisMetaStoreManager.loadResolvedEntityById(
- callContext, entityCatalogId, entityId, entityType);
-
- // not found, exit
- if (!result.isSuccess()) {
- return null;
- }
-
- // if found, setup entry
- callContext.getDiagServices().checkNotNull(result.getEntity(),
"entity_should_loaded");
- callContext
- .getDiagServices()
- .checkNotNull(result.getEntityGrantRecords(),
"entity_grant_records_should_loaded");
- entry =
- new ResolvedPolarisEntity(
- callContext.getDiagServices(),
- result.getEntity(),
- result.getEntityGrantRecords(),
- result.getGrantRecordsVersion());
-
- // the above loading could take a long time so check again if the entry
exists and only
- this.cacheNewEntry(entry);
+ final AtomicBoolean cacheHit = new AtomicBoolean(true);
+
+ ResolvedPolarisEntity entity =
+ this.cache.get(
+ new IdKey(entityId),
+ () -> {
+ cacheHit.set(false);
Review Comment:
I think this needs to be guarded via `this.locks` as well, otherwise it can
race with `getAndRefreshIfNeeded`. (Then we don't need the loader-function and
force a `synchronized` lock in CHM.)
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -18,185 +18,74 @@
*/
package org.apache.polaris.core.persistence.cache;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.google.common.util.concurrent.Striped;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
/** The entity cache, can be private or shared */
public class EntityCache {
+ private static final Comparator<ResolvedPolarisEntity>
RESOLVED_POLARIS_ENTITY_COMPARATOR =
+ Comparator.nullsLast(
+ Comparator.<ResolvedPolarisEntity>comparingInt(rpe ->
rpe.getEntity().getEntityVersion())
+ .thenComparingInt(rpe ->
rpe.getEntity().getGrantRecordsVersion()));
Review Comment:
Is this being used?
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -255,104 +150,74 @@ public void setCacheMode(EntityCacheMode cacheMode) {
@Nonnull PolarisBaseEntity entityToValidate,
int entityMinVersion,
int entityGrantRecordsMinVersion) {
- long entityCatalogId = entityToValidate.getCatalogId();
long entityId = entityToValidate.getId();
- PolarisEntityType entityType = entityToValidate.getType();
- // first lookup the cache to find the existing cache entry
+ // First lookup the cache to find the existing cache entry
ResolvedPolarisEntity existingCacheEntry = this.getEntityById(entityId);
- // the caller's fetched entity may have come from a stale lookup byName;
we should consider
- // the existingCacheEntry to be the older of the two for purposes of
invalidation to make
- // sure when we replaceCacheEntry we're also removing the old name if it's
no longer valid
- EntityCacheByNameKey nameKey = new EntityCacheByNameKey(entityToValidate);
- ResolvedPolarisEntity existingCacheEntryByName =
this.getEntityByName(nameKey);
- if (existingCacheEntryByName != null
- && existingCacheEntry != null
- && isNewer(existingCacheEntry, existingCacheEntryByName)) {
- existingCacheEntry = existingCacheEntryByName;
+ // See if we need to load or refresh that entity
+ if (existingCacheEntry != null
+ && existingCacheEntry.getEntity().getEntityVersion() >=
entityMinVersion
+ && existingCacheEntry.getEntity().getGrantRecordsVersion()
+ >= entityGrantRecordsMinVersion) {
+ // Cache hit and cache entry is up to date, nothing to do.
+ return existingCacheEntry;
}
- // the new one to be returned
- final ResolvedPolarisEntity newCacheEntry;
-
- // see if we need to load or refresh that entity
- if (existingCacheEntry == null
- || existingCacheEntry.getEntity().getEntityVersion() < entityMinVersion
- || existingCacheEntry.getEntity().getGrantRecordsVersion() <
entityGrantRecordsMinVersion) {
+ // Either cache miss, dropped entity or stale entity. In either case,
invalidate and reload it.
+ // Do the refresh in a critical section based on the entity ID to avoid
race conditions.
+ Lock lock = this.locks.get(entityId);
+ try {
+ lock.lock();
+
+ // Lookup the cache again in case another thread has already invalidated
it.
+ existingCacheEntry = this.getEntityById(entityId);
+
+ // See if the entity has been refreshed concurrently
+ if (existingCacheEntry != null
+ && existingCacheEntry.getEntity().getEntityVersion() >=
entityMinVersion
+ && existingCacheEntry.getEntity().getGrantRecordsVersion()
+ >= entityGrantRecordsMinVersion) {
+ // Cache hit and cache entry is up to date now, exit
+ return existingCacheEntry;
+ }
Review Comment:
Does this block make sense - mean, does the `getEntityById()` yield a
concurrently loaded result often?
Not a major thing, just asking.
##########
bom/build.gradle.kts:
##########
@@ -50,5 +50,6 @@ dependencies {
api(project(":polaris-quarkus-spark-tests"))
api(project(":polaris-tests"))
+ api(project(":polaris-jcstress-tests"))
Review Comment:
Why not add the jcstress tests into the project that implements the cache?
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -18,185 +18,74 @@
*/
package org.apache.polaris.core.persistence.cache;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.google.common.util.concurrent.Striped;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
/** The entity cache, can be private or shared */
public class EntityCache {
+ private static final Comparator<ResolvedPolarisEntity>
RESOLVED_POLARIS_ENTITY_COMPARATOR =
+ Comparator.nullsLast(
+ Comparator.<ResolvedPolarisEntity>comparingInt(rpe ->
rpe.getEntity().getEntityVersion())
+ .thenComparingInt(rpe ->
rpe.getEntity().getGrantRecordsVersion()));
+ private static final int STRIPES = 1_024;
// cache mode
private EntityCacheMode cacheMode;
// the meta store manager
private final PolarisMetaStoreManager polarisMetaStoreManager;
- // Caffeine cache to keep entries by id
- private final Cache<Long, ResolvedPolarisEntity> byId;
+ // Caffeine cache to keep entries
+ private final IndexedCache<CacheKey, ResolvedPolarisEntity> cache;
- // index by name
- private final AbstractMap<EntityCacheByNameKey, ResolvedPolarisEntity>
byName;
+ // Locks to ensure that an entity can only be refreshed by one thread at a
time
+ private final Striped<Lock> locks;
/**
* Constructor. Cache can be private or shared
*
* @param polarisMetaStoreManager the meta store manager implementation
*/
public EntityCache(@Nonnull PolarisMetaStoreManager polarisMetaStoreManager)
{
-
- // by name cache
- this.byName = new ConcurrentHashMap<>();
-
- // When an entry is removed, we simply remove it from the byName map
- RemovalListener<Long, ResolvedPolarisEntity> removalListener =
- (key, value, cause) -> {
- if (value != null) {
- // compute name key
- EntityCacheByNameKey nameKey = new
EntityCacheByNameKey(value.getEntity());
-
- // if it is still active, remove it from the name key
- this.byName.remove(nameKey, value);
- }
- };
-
- // use a Caffeine cache to purge entries when those have not been used for
a long time.
- // Assuming 1KB per entry, 100K entries is about 100MB.
- this.byId =
- Caffeine.newBuilder()
- .maximumSize(100_000) // Set maximum size to 100,000 elements
- .expireAfterAccess(1, TimeUnit.HOURS) // Expire entries after 1
hour of no access
- .removalListener(removalListener) // Set the removal listener
- .build();
-
// remember the meta store manager
this.polarisMetaStoreManager = polarisMetaStoreManager;
// enabled by default
this.cacheMode = EntityCacheMode.ENABLE;
+
+ // Use a Caffeine cache to purge entries when those have not been used for
a long time.
+ // Assuming 1KB per entry, 100K entries is about 100MB. Note that each
entity is stored twice in
Review Comment:
100k entries here ;)
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java:
##########
@@ -18,185 +18,74 @@
*/
package org.apache.polaris.core.persistence.cache;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.google.common.util.concurrent.Striped;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
/** The entity cache, can be private or shared */
public class EntityCache {
+ private static final Comparator<ResolvedPolarisEntity>
RESOLVED_POLARIS_ENTITY_COMPARATOR =
+ Comparator.nullsLast(
+ Comparator.<ResolvedPolarisEntity>comparingInt(rpe ->
rpe.getEntity().getEntityVersion())
+ .thenComparingInt(rpe ->
rpe.getEntity().getGrantRecordsVersion()));
+ private static final int STRIPES = 1_024;
// cache mode
private EntityCacheMode cacheMode;
// the meta store manager
private final PolarisMetaStoreManager polarisMetaStoreManager;
- // Caffeine cache to keep entries by id
- private final Cache<Long, ResolvedPolarisEntity> byId;
+ // Caffeine cache to keep entries
+ private final IndexedCache<CacheKey, ResolvedPolarisEntity> cache;
- // index by name
- private final AbstractMap<EntityCacheByNameKey, ResolvedPolarisEntity>
byName;
+ // Locks to ensure that an entity can only be refreshed by one thread at a
time
+ private final Striped<Lock> locks;
/**
* Constructor. Cache can be private or shared
*
* @param polarisMetaStoreManager the meta store manager implementation
*/
public EntityCache(@Nonnull PolarisMetaStoreManager polarisMetaStoreManager)
{
-
- // by name cache
- this.byName = new ConcurrentHashMap<>();
-
- // When an entry is removed, we simply remove it from the byName map
- RemovalListener<Long, ResolvedPolarisEntity> removalListener =
- (key, value, cause) -> {
- if (value != null) {
- // compute name key
- EntityCacheByNameKey nameKey = new
EntityCacheByNameKey(value.getEntity());
-
- // if it is still active, remove it from the name key
- this.byName.remove(nameKey, value);
- }
- };
-
- // use a Caffeine cache to purge entries when those have not been used for
a long time.
- // Assuming 1KB per entry, 100K entries is about 100MB.
- this.byId =
- Caffeine.newBuilder()
- .maximumSize(100_000) // Set maximum size to 100,000 elements
- .expireAfterAccess(1, TimeUnit.HOURS) // Expire entries after 1
hour of no access
- .removalListener(removalListener) // Set the removal listener
- .build();
-
// remember the meta store manager
this.polarisMetaStoreManager = polarisMetaStoreManager;
// enabled by default
this.cacheMode = EntityCacheMode.ENABLE;
+
+ // Use a Caffeine cache to purge entries when those have not been used for
a long time.
+ // Assuming 1KB per entry, 100K entries is about 100MB. Note that each
entity is stored twice in
+ // the cache, once indexed by its identifier and once indexed by its name.
+ this.cache =
+ new IndexedCache.Builder<CacheKey, ResolvedPolarisEntity>()
+ .primaryKey(e -> new IdKey(e.getEntity().getId()))
+ .addSecondaryKey(e -> new NameKey(new
EntityCacheByNameKey(e.getEntity())))
+ .expireAfterWrite(Duration.ofMinutes(5))
+ .maximumSize(10_000)
+ .build();
+ this.locks = Striped.lock(STRIPES);
}
/**
- * Remove the specified cache entry from the cache
+ * Remove the specified cache entry from the cache. This applies both to the
cache entry indexed
+ * by its id and by its name.
*
* @param cacheEntry cache entry to remove
*/
public void removeCacheEntry(@Nonnull ResolvedPolarisEntity cacheEntry) {
- // compute name key
- EntityCacheByNameKey nameKey = new
EntityCacheByNameKey(cacheEntry.getEntity());
-
- // remove this old entry, this will immediately remove the named entry
- this.byId.asMap().remove(cacheEntry.getEntity().getId(), cacheEntry);
-
- // remove it from the name key
- this.byName.remove(nameKey, cacheEntry);
- }
-
- /**
- * Cache new entry
- *
- * @param cacheEntry new cache entry
- */
- private void cacheNewEntry(@Nonnull ResolvedPolarisEntity cacheEntry) {
-
- // compute name key
- EntityCacheByNameKey nameKey = new
EntityCacheByNameKey(cacheEntry.getEntity());
-
- // get old value if one exist
- ResolvedPolarisEntity oldCacheEntry =
this.byId.getIfPresent(cacheEntry.getEntity().getId());
-
- // put new entry, only if really newer one
- this.byId
- .asMap()
- .merge(
- cacheEntry.getEntity().getId(),
- cacheEntry,
- (oldValue, newValue) -> this.isNewer(newValue, oldValue) ?
newValue : oldValue);
-
- // only update the name key if this entity was not dropped
- if (!cacheEntry.getEntity().isDropped()) {
- // here we don't really care about concurrent update to the key.
Basically if we are
- // pointing to the wrong entry, we will detect this and fix the issue
- this.byName.put(nameKey, cacheEntry);
- }
-
- // remove old name if it has changed
- if (oldCacheEntry != null) {
- // old name
- EntityCacheByNameKey oldNameKey = new
EntityCacheByNameKey(oldCacheEntry.getEntity());
- if (!oldNameKey.equals(nameKey)) {
- this.byName.remove(oldNameKey, oldCacheEntry);
- }
- }
- }
-
- /**
- * Determine if the newer value is really newer
- *
- * @param newValue new cache entry
- * @param oldValue old cache entry
- * @return true if the newer cache entry
- */
- private boolean isNewer(ResolvedPolarisEntity newValue,
ResolvedPolarisEntity oldValue) {
- return (newValue.getEntity().getEntityVersion() >
oldValue.getEntity().getEntityVersion()
- || newValue.getEntity().getGrantRecordsVersion()
- > oldValue.getEntity().getGrantRecordsVersion());
- }
-
- /**
- * Replace an old entry with a new one
- *
- * @param oldCacheEntry old entry
- * @param newCacheEntry new entry
- */
- private void replaceCacheEntry(
- @Nullable ResolvedPolarisEntity oldCacheEntry, @Nonnull
ResolvedPolarisEntity newCacheEntry) {
-
- // need to remove old?
- if (oldCacheEntry != null) {
- // only replace if there is a difference
- if (this.entityNameKeyMismatch(oldCacheEntry.getEntity(),
newCacheEntry.getEntity())
- || oldCacheEntry.getEntity().getEntityVersion()
- < newCacheEntry.getEntity().getEntityVersion()
- || oldCacheEntry.getEntity().getGrantRecordsVersion()
- < newCacheEntry.getEntity().getGrantRecordsVersion()) {
- // write new one
- this.cacheNewEntry(newCacheEntry);
-
- // delete the old one assuming it has not been replaced by the above
new entry
- this.removeCacheEntry(oldCacheEntry);
- }
- } else {
- // write new one
- this.cacheNewEntry(newCacheEntry);
- }
- }
-
- /**
- * Check if two entities have different cache keys (either by id or by name)
- *
- * @param entity the entity
- * @param otherEntity the other entity
- * @return true if there is a mismatch
- */
- private boolean entityNameKeyMismatch(
- @Nonnull PolarisBaseEntity entity, @Nonnull PolarisBaseEntity
otherEntity) {
- return entity.getId() != otherEntity.getId()
- || entity.getParentId() != otherEntity.getParentId()
- || !entity.getName().equals(otherEntity.getName())
- || entity.getTypeCode() != otherEntity.getTypeCode();
+ IdKey key = new IdKey(cacheEntry.getEntity().getId());
+ this.cache.invalidate(key);
Review Comment:
This probably also needs to be guarded via `this.locks`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]