pingtimeout commented on code in PR #1339: URL: https://github.com/apache/polaris/pull/1339#discussion_r2035693488
########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/IndexedCache.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Striped; +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedHashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.function.Function; +import java.util.function.Supplier; + +// CODE_COPIED_TO_POLARIS +// Source: +// https://github.com/ben-manes/caffeine/blob/master/examples/indexable/src/main/java/com/github/benmanes/caffeine/examples/indexable/IndexedCache.java + +/** + * A cache abstraction that allows the entry to looked up by alternative keys. This approach mirrors + * a database table where a row is stored by its primary key, it contains all of the columns that + * identify it, and the unique indexes are additional mappings defined by the column mappings. This + * class similarly stores the in the value once in the cache by its primary key and maintains a + * secondary mapping for lookups by using indexing functions to derive the keys. + * + * @author [email protected] (Ben Manes) + */ +final class IndexedCache<K, V> { + private final ConcurrentMap<K, LinkedHashSet<K>> indexes; + private final LinkedHashSet<Function<V, K>> indexers; + private final Striped<Lock> locks; + private final Cache<K, V> store; + + private IndexedCache( + Caffeine<Object, Object> cacheBuilder, LinkedHashSet<Function<V, K>> indexers) { + this.indexes = new ConcurrentHashMap<>(); + this.locks = Striped.lock(1_024); + this.indexers = indexers; + this.store = + cacheBuilder + .evictionListener((key, value, cause) -> indexes.keySet().removeAll(indexes.get(key))) + .build(); + } + + /** Returns the value associated with the key or {@code null} if not found. */ + V getIfPresent(K key) { + var index = indexes.get(key); + return (index == null) ? null : store.getIfPresent(index.iterator().next()); + } + + /** + * Returns the value associated with the key, obtaining that value from the {@code loader} if + * necessary. The entire method invocation is performed atomically, so the function is applied at + * most once per key. As the value may be looked up by alternative keys, those function + * invocations may be executed in parallel and will replace any existing mappings when completed. + */ + V get(K key, Supplier<V> loader) { + var value = getIfPresent(key); + if (value != null) { + return value; + } + + var lock = locks.get(key); + lock.lock(); + try { + value = getIfPresent(key); + if (value != null) { + return value; + } + + value = loader.get(); + if (value == null) { + return null; + } + + put(value); + return value; + } finally { + lock.unlock(); + } + } + + /** Associates the {@code value} with its keys, replacing the old value and keys if present. */ + private void put(V value) { + putIfNewer(value, (oldValue, newValue) -> 1); + } + + /** + * Associates the {@code newValue} with its keys if it is newer than the existing value according + * to the provided {@code comparator}. If the new value is added, the old value and its associated + * keys are replaced. + * + * @param newValue the new value to be added to the cache + * @param comparator a comparator to determine if the new value is newer than the existing value. + * The comparator must support `null` values in case no value exist in the cache prior to + * invocation. + */ + private void putIfNewer(V newValue, Comparator<V> comparator) { + requireNonNull(newValue); + var index = buildIndex(newValue); + store + .asMap() + .compute( + index.iterator().next(), + (key, oldValue) -> { + if (comparator.compare(oldValue, newValue) > 0) { + if (oldValue != null) { + indexes + .keySet() + .removeAll(Sets.difference(indexes.get(index.iterator().next()), index)); + } + for (var indexKey : index) { + indexes.put(indexKey, index); + } + return newValue; + } else { + return oldValue; Review Comment: My bad, another leftover from the attempt at moving the entity version comparison logic to the IndexedCache... ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/IndexedCache.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.cache; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Striped; +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedHashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.function.Function; +import java.util.function.Supplier; + +// CODE_COPIED_TO_POLARIS +// Source: +// https://github.com/ben-manes/caffeine/blob/master/examples/indexable/src/main/java/com/github/benmanes/caffeine/examples/indexable/IndexedCache.java + +/** + * A cache abstraction that allows the entry to looked up by alternative keys. This approach mirrors + * a database table where a row is stored by its primary key, it contains all of the columns that + * identify it, and the unique indexes are additional mappings defined by the column mappings. This + * class similarly stores the in the value once in the cache by its primary key and maintains a + * secondary mapping for lookups by using indexing functions to derive the keys. + * + * @author [email protected] (Ben Manes) + */ +final class IndexedCache<K, V> { + private final ConcurrentMap<K, LinkedHashSet<K>> indexes; + private final LinkedHashSet<Function<V, K>> indexers; + private final Striped<Lock> locks; + private final Cache<K, V> store; + + private IndexedCache( + Caffeine<Object, Object> cacheBuilder, LinkedHashSet<Function<V, K>> indexers) { + this.indexes = new ConcurrentHashMap<>(); + this.locks = Striped.lock(1_024); + this.indexers = indexers; + this.store = + cacheBuilder + .evictionListener((key, value, cause) -> indexes.keySet().removeAll(indexes.get(key))) + .build(); + } + + /** Returns the value associated with the key or {@code null} if not found. */ + V getIfPresent(K key) { + var index = indexes.get(key); + return (index == null) ? null : store.getIfPresent(index.iterator().next()); + } + + /** + * Returns the value associated with the key, obtaining that value from the {@code loader} if + * necessary. The entire method invocation is performed atomically, so the function is applied at + * most once per key. As the value may be looked up by alternative keys, those function + * invocations may be executed in parallel and will replace any existing mappings when completed. + */ + V get(K key, Supplier<V> loader) { + var value = getIfPresent(key); + if (value != null) { + return value; + } + + var lock = locks.get(key); + lock.lock(); + try { + value = getIfPresent(key); + if (value != null) { + return value; + } + + value = loader.get(); + if (value == null) { + return null; + } + + put(value); + return value; + } finally { + lock.unlock(); + } + } + + /** Associates the {@code value} with its keys, replacing the old value and keys if present. */ + private void put(V value) { + putIfNewer(value, (oldValue, newValue) -> 1); + } + + /** + * Associates the {@code newValue} with its keys if it is newer than the existing value according + * to the provided {@code comparator}. If the new value is added, the old value and its associated + * keys are replaced. + * + * @param newValue the new value to be added to the cache + * @param comparator a comparator to determine if the new value is newer than the existing value. + * The comparator must support `null` values in case no value exist in the cache prior to + * invocation. + */ + private void putIfNewer(V newValue, Comparator<V> comparator) { + requireNonNull(newValue); + var index = buildIndex(newValue); + store + .asMap() + .compute( + index.iterator().next(), + (key, oldValue) -> { + if (comparator.compare(oldValue, newValue) > 0) { + if (oldValue != null) { + indexes + .keySet() + .removeAll(Sets.difference(indexes.get(index.iterator().next()), index)); + } + for (var indexKey : index) { + indexes.put(indexKey, index); + } + return newValue; + } else { + return oldValue; + } + }); + } + + /** Discards any cached value and its keys. */ + void invalidate(K key) { + var index = indexes.get(key); + if (index == null) { + return; + } + + store + .asMap() + .computeIfPresent( + index.iterator().next(), + (k, v) -> { + indexes.remove(k); Review Comment: Ooh that one was nasty. You are right, the index for the secondary key was leaking. Which is ironical as the purpose of this PR was also to fix that very leak... I have added a test case and a fix for it. To be clear, the IndexedCache still allows for race conditions internally. Like the following: 1. Get a value for id=1 and name=foo, miss 2. Enter the lock, fetch the value from the database 3. While in the `store.asMap().compute(...)` call, just after the line that added the index `name=foo <-> id=1` to the `indexes` data structure, and **before the call completes**: 4. Another caller get the value by `name=foo`, resolves to `id=1` 5. The second caller proceeds to trying to get the value from the cache, get `null` 6. Now the second caller has to wait for the `lock` to be released and try again This is a benign race condition, though. But it illustrates that the visibility of some operations are visible becore the operations are completed. This makes the code hard to maintain. Although less hard than the current `EntityCache`... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
