wchevreuil commented on code in PR #7291: URL: https://github.com/apache/hbase/pull/7291#discussion_r2408141877
########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; + +/** + * It is responsible for populating the row cache and retrieving rows from it. + */ [email protected] +public class RowCacheService { + /** + * A barrier that prevents the row cache from being populated during region operations, such as + * bulk loads. It is implemented as a counter to address issues that arise when the same region is + * updated concurrently. + */ + private final Map<HRegion, AtomicInteger> regionLevelBarrierMap = new ConcurrentHashMap<>(); + /** + * A barrier that prevents the row cache from being populated during row mutations. It is + * implemented as a counter to address issues that arise when the same row is mutated + * concurrently. + */ + private final Map<RowCacheKey, AtomicInteger> rowLevelBarrierMap = new ConcurrentHashMap<>(); + + private final boolean enabledByConf; + private final RowCache rowCache; + + @FunctionalInterface + interface RowOperation<R> { + R execute() throws IOException; + } + + RowCacheService(Configuration conf) { + enabledByConf = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT) > 0; + rowCache = new RowCache(MemorySizeUtil.getRowCacheSize(conf)); + } + + RegionScannerImpl getScanner(HRegion region, Get get, Scan scan, List<Cell> results, + RpcCallContext context) throws IOException { + if (!canCacheRow(get, region)) { + return getScannerInternal(region, scan, results); + } + + RowCacheKey key = new RowCacheKey(region, get.getRow()); + + // Try get from row cache + if (tryGetFromCache(region, key, get, results)) { + // Cache is hit, and then no scanner is created + return null; + } + + RegionScannerImpl scanner = getScannerInternal(region, scan, results); + + // When results came from memstore only, do not populate the row cache + boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1; + if (!readFromMemStoreOnly) { + populateCache(region, results, key); + } + + return scanner; + } + + private RegionScannerImpl getScannerInternal(HRegion region, Scan scan, List<Cell> results) + throws IOException { + RegionScannerImpl scanner = region.getScanner(scan); + scanner.next(results); + return scanner; + } + + private boolean tryGetFromCache(HRegion region, RowCacheKey key, Get get, List<Cell> results) { + RowCells row = rowCache.getBlock(key, get.getCacheBlocks()); + + if (row == null) { + return false; + } + + results.addAll(row.getCells()); + region.addReadRequestsCount(1); + if (region.getMetrics() != null) { + region.getMetrics().updateReadRequestCount(); + } + return true; + } + + private void populateCache(HRegion region, List<Cell> results, RowCacheKey key) { + // The row cache is populated only when no region level barriers remain + regionLevelBarrierMap.computeIfAbsent(region, t -> { + // The row cache is populated only when no row level barriers remain + rowLevelBarrierMap.computeIfAbsent(key, k -> { + try { + rowCache.cacheBlock(key, new RowCells(results)); + } catch (CloneNotSupportedException ignored) { + // Not able to cache row cells, ignore + } + return null; + }); + return null; + }); + } + + BulkLoadHFileResponse bulkLoadHFile(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + HRegion region; + try { + region = rsRpcServices.getRegion(request.getRegion()); + } catch (IOException ie) { + throw new ServiceException(ie); + } + + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return bulkLoad(rsRpcServices, request); + } + + // Since bulkload modifies the store files, the row cache should be disabled until the bulkload + // is finished. + createRegionLevelBarrier(region); + try { + // We do not invalidate the entire row cache directly, as it contains a large number of + // entries and takes a long time. Instead, we increment rowCacheSeqNum, which is used when + // constructing a RowCacheKey, thereby making the existing row cache entries stale. + increaseRowCacheSeqNum(region); + return bulkLoad(rsRpcServices, request); + } finally { + // The row cache for the region has been enabled again + removeTableLevelBarrier(region); + } + } + + BulkLoadHFileResponse bulkLoad(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + return rsRpcServices.bulkLoadHFileInternal(request); + } + + void increaseRowCacheSeqNum(HRegion region) { + region.increaseRowCacheSeqNum(); + } + + void removeTableLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfPresent(region, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + void createRegionLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfAbsent(region, k -> new AtomicInteger(0)).incrementAndGet(); + } + + // @formatter:off + /** + * Row cache is only enabled when the following conditions are met: + * - Row cache is enabled at the table level. + * - Cache blocks is enabled in the get request. + * - A Get object cannot be distinguished from others except by its row key. + * So we check equality for the following: + * - filter + * - retrieving cells + * - TTL + * - attributes + * - CheckExistenceOnly + * - ColumnFamilyTimeRange + * - Consistency + * - MaxResultsPerColumnFamily + * - ReplicaId + * - RowOffsetPerColumnFamily + * @param get the Get request + * @param region the Region + * @return true if the row can be cached, false otherwise + */ + // @formatter:on + boolean canCacheRow(Get get, Region region) { + return enabledByConf && region.getTableDescriptor().isRowCacheEnabled() && get.getCacheBlocks() + && get.getFilter() == null && isRetrieveAllCells(get, region) && isDefaultTtl(region) + && get.getAttributesMap().isEmpty() && !get.isCheckExistenceOnly() + && get.getColumnFamilyTimeRange().isEmpty() && get.getConsistency() == Consistency.STRONG + && get.getMaxResultsPerColumnFamily() == -1 && get.getReplicaId() == -1 + && get.getRowOffsetPerColumnFamily() == 0 && get.getTimeRange().isAllTime(); + } + + private static boolean isRetrieveAllCells(Get get, Region region) { + if (region.getTableDescriptor().getColumnFamilyCount() != get.numFamilies()) { + return false; + } + + boolean hasQualifier = get.getFamilyMap().values().stream().anyMatch(Objects::nonNull); + return !hasQualifier; + } + + private static boolean isDefaultTtl(Region region) { + return Arrays.stream(region.getTableDescriptor().getColumnFamilies()) + .allMatch(cfd -> cfd.getTimeToLive() == ColumnFamilyDescriptorBuilder.DEFAULT_TTL); + } + + private <R> R mutateWithRowCacheBarrier(HRegion region, List<Mutation> mutations, + RowOperation<R> operation) throws IOException { + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return operation.execute(); + } + + Set<RowCacheKey> rowCacheKeys = new HashSet<>(mutations.size()); + try { + // Evict the entire row cache + mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, mutation.getRow()))); + rowCacheKeys.forEach(key -> { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRowCache(key); + }); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + rowCacheKeys.forEach(this::removeRowLevelBarrier); + } + } + + private <R> R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation<R> operation) + throws IOException { + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return operation.execute(); + } + + RowCacheKey key = new RowCacheKey(region, row); + try { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRowCache(key); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + removeRowLevelBarrier(key); Review Comment: Should (or could) we recache the mutated row? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java: ########## @@ -8696,6 +8703,17 @@ public long getOpenSeqNum() { return this.openSeqNum; } + public long getRowCacheSeqNum() { + return this.rowCacheSeqNum.get(); + } + + /** + * This is used to invalidate the entire row cache after bulk loading. + */ Review Comment: Is this comment correct? I thought we would be invalidating only the rows for the given regions. Rows from regions not touched by bulkload would stay valid. ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.LongAdder; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * A cache that stores rows retrieved by Get operations, using Caffeine as the underlying cache + * implementation. + */ [email protected] +public class RowCache { + private final class EvictionListener + implements RemovalListener<@NonNull RowCacheKey, @NonNull RowCells> { + @Override + public void onRemoval(RowCacheKey key, RowCells value, @NonNull RemovalCause cause) { + evictedRowCount.increment(); + } + } + + private final Cache<@NonNull RowCacheKey, RowCells> cache; + + private final LongAdder hitCount = new LongAdder(); + private final LongAdder missCount = new LongAdder(); + private final LongAdder evictedRowCount = new LongAdder(); + Review Comment: Can't we use Cache.stats() for this? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; + +/** + * It is responsible for populating the row cache and retrieving rows from it. + */ [email protected] +public class RowCacheService { + /** + * A barrier that prevents the row cache from being populated during region operations, such as + * bulk loads. It is implemented as a counter to address issues that arise when the same region is + * updated concurrently. + */ + private final Map<HRegion, AtomicInteger> regionLevelBarrierMap = new ConcurrentHashMap<>(); + /** + * A barrier that prevents the row cache from being populated during row mutations. It is + * implemented as a counter to address issues that arise when the same row is mutated + * concurrently. + */ + private final Map<RowCacheKey, AtomicInteger> rowLevelBarrierMap = new ConcurrentHashMap<>(); + + private final boolean enabledByConf; + private final RowCache rowCache; Review Comment: Should we consider define an interface for RowCache and refer to the interface from here, so that we can accommodate future new RowCache implementations, beyond the caffeine one currently provided as the reference one? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.LongAdder; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * A cache that stores rows retrieved by Get operations, using Caffeine as the underlying cache + * implementation. + */ [email protected] +public class RowCache { + private final class EvictionListener + implements RemovalListener<@NonNull RowCacheKey, @NonNull RowCells> { + @Override + public void onRemoval(RowCacheKey key, RowCells value, @NonNull RemovalCause cause) { + evictedRowCount.increment(); + } + } + + private final Cache<@NonNull RowCacheKey, RowCells> cache; + + private final LongAdder hitCount = new LongAdder(); + private final LongAdder missCount = new LongAdder(); + private final LongAdder evictedRowCount = new LongAdder(); + + RowCache(long maxSizeBytes) { + if (maxSizeBytes <= 0) { + cache = Caffeine.newBuilder().maximumSize(0).build(); + return; + } + + cache = + Caffeine.newBuilder().maximumWeight(maxSizeBytes).removalListener(new EvictionListener()) + .weigher((RowCacheKey key, + RowCells value) -> (int) Math.min(key.heapSize() + value.heapSize(), Integer.MAX_VALUE)) + .build(); + } + + void cacheBlock(RowCacheKey key, RowCells value) { + cache.put(key, value); + } + + public RowCells getBlock(RowCacheKey key, boolean caching) { + if (!caching) { + missCount.increment(); + return null; + } + + RowCells value = cache.getIfPresent(key); + if (value == null) { + missCount.increment(); + } else { + hitCount.increment(); + } + return value; + } + + void evictBlock(RowCacheKey key) { + cache.asMap().remove(key); + } Review Comment: We should rename all these methods, as we are not caching blocks, but rows. ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; + +/** + * It is responsible for populating the row cache and retrieving rows from it. + */ [email protected] +public class RowCacheService { + /** + * A barrier that prevents the row cache from being populated during region operations, such as + * bulk loads. It is implemented as a counter to address issues that arise when the same region is + * updated concurrently. + */ + private final Map<HRegion, AtomicInteger> regionLevelBarrierMap = new ConcurrentHashMap<>(); + /** + * A barrier that prevents the row cache from being populated during row mutations. It is + * implemented as a counter to address issues that arise when the same row is mutated + * concurrently. + */ + private final Map<RowCacheKey, AtomicInteger> rowLevelBarrierMap = new ConcurrentHashMap<>(); + + private final boolean enabledByConf; + private final RowCache rowCache; + + @FunctionalInterface + interface RowOperation<R> { + R execute() throws IOException; + } + + RowCacheService(Configuration conf) { + enabledByConf = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT) > 0; + rowCache = new RowCache(MemorySizeUtil.getRowCacheSize(conf)); + } + + RegionScannerImpl getScanner(HRegion region, Get get, Scan scan, List<Cell> results, + RpcCallContext context) throws IOException { + if (!canCacheRow(get, region)) { + return getScannerInternal(region, scan, results); + } + + RowCacheKey key = new RowCacheKey(region, get.getRow()); + + // Try get from row cache + if (tryGetFromCache(region, key, get, results)) { + // Cache is hit, and then no scanner is created + return null; + } + + RegionScannerImpl scanner = getScannerInternal(region, scan, results); + + // When results came from memstore only, do not populate the row cache + boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1; + if (!readFromMemStoreOnly) { + populateCache(region, results, key); + } + + return scanner; + } + + private RegionScannerImpl getScannerInternal(HRegion region, Scan scan, List<Cell> results) + throws IOException { + RegionScannerImpl scanner = region.getScanner(scan); + scanner.next(results); + return scanner; + } + + private boolean tryGetFromCache(HRegion region, RowCacheKey key, Get get, List<Cell> results) { + RowCells row = rowCache.getBlock(key, get.getCacheBlocks()); + + if (row == null) { + return false; + } + + results.addAll(row.getCells()); + region.addReadRequestsCount(1); + if (region.getMetrics() != null) { + region.getMetrics().updateReadRequestCount(); + } + return true; + } + + private void populateCache(HRegion region, List<Cell> results, RowCacheKey key) { + // The row cache is populated only when no region level barriers remain + regionLevelBarrierMap.computeIfAbsent(region, t -> { + // The row cache is populated only when no row level barriers remain + rowLevelBarrierMap.computeIfAbsent(key, k -> { + try { + rowCache.cacheBlock(key, new RowCells(results)); + } catch (CloneNotSupportedException ignored) { + // Not able to cache row cells, ignore + } + return null; + }); + return null; + }); + } + + BulkLoadHFileResponse bulkLoadHFile(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + HRegion region; + try { + region = rsRpcServices.getRegion(request.getRegion()); + } catch (IOException ie) { + throw new ServiceException(ie); + } + + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return bulkLoad(rsRpcServices, request); + } + + // Since bulkload modifies the store files, the row cache should be disabled until the bulkload + // is finished. + createRegionLevelBarrier(region); + try { + // We do not invalidate the entire row cache directly, as it contains a large number of + // entries and takes a long time. Instead, we increment rowCacheSeqNum, which is used when + // constructing a RowCacheKey, thereby making the existing row cache entries stale. + increaseRowCacheSeqNum(region); + return bulkLoad(rsRpcServices, request); + } finally { + // The row cache for the region has been enabled again + removeTableLevelBarrier(region); + } + } + + BulkLoadHFileResponse bulkLoad(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + return rsRpcServices.bulkLoadHFileInternal(request); + } + + void increaseRowCacheSeqNum(HRegion region) { + region.increaseRowCacheSeqNum(); + } + + void removeTableLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfPresent(region, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + void createRegionLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfAbsent(region, k -> new AtomicInteger(0)).incrementAndGet(); + } + + // @formatter:off + /** + * Row cache is only enabled when the following conditions are met: + * - Row cache is enabled at the table level. + * - Cache blocks is enabled in the get request. + * - A Get object cannot be distinguished from others except by its row key. + * So we check equality for the following: + * - filter + * - retrieving cells + * - TTL + * - attributes + * - CheckExistenceOnly + * - ColumnFamilyTimeRange + * - Consistency + * - MaxResultsPerColumnFamily + * - ReplicaId + * - RowOffsetPerColumnFamily + * @param get the Get request + * @param region the Region + * @return true if the row can be cached, false otherwise + */ + // @formatter:on + boolean canCacheRow(Get get, Region region) { + return enabledByConf && region.getTableDescriptor().isRowCacheEnabled() && get.getCacheBlocks() + && get.getFilter() == null && isRetrieveAllCells(get, region) && isDefaultTtl(region) + && get.getAttributesMap().isEmpty() && !get.isCheckExistenceOnly() + && get.getColumnFamilyTimeRange().isEmpty() && get.getConsistency() == Consistency.STRONG + && get.getMaxResultsPerColumnFamily() == -1 && get.getReplicaId() == -1 + && get.getRowOffsetPerColumnFamily() == 0 && get.getTimeRange().isAllTime(); + } + + private static boolean isRetrieveAllCells(Get get, Region region) { + if (region.getTableDescriptor().getColumnFamilyCount() != get.numFamilies()) { + return false; + } + + boolean hasQualifier = get.getFamilyMap().values().stream().anyMatch(Objects::nonNull); + return !hasQualifier; + } + + private static boolean isDefaultTtl(Region region) { + return Arrays.stream(region.getTableDescriptor().getColumnFamilies()) + .allMatch(cfd -> cfd.getTimeToLive() == ColumnFamilyDescriptorBuilder.DEFAULT_TTL); + } + + private <R> R mutateWithRowCacheBarrier(HRegion region, List<Mutation> mutations, + RowOperation<R> operation) throws IOException { + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return operation.execute(); + } + + Set<RowCacheKey> rowCacheKeys = new HashSet<>(mutations.size()); + try { + // Evict the entire row cache + mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, mutation.getRow()))); + rowCacheKeys.forEach(key -> { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRowCache(key); + }); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + rowCacheKeys.forEach(this::removeRowLevelBarrier); + } + } + + private <R> R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation<R> operation) + throws IOException { + if (!region.getTableDescriptor().isRowCacheEnabled()) { + return operation.execute(); + } + + RowCacheKey key = new RowCacheKey(region, row); + try { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRowCache(key); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + removeRowLevelBarrier(key); + } + } + + <R> R execute(RowOperation<R> operation) throws IOException { + return operation.execute(); + } + + void evictRowCache(RowCacheKey key) { Review Comment: nit: should be "evictRow". ########## hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + [email protected] +public class RowCacheKey implements HeapSize { + public static final long FIXED_OVERHEAD = ClassSize.estimateBase(RowCacheKey.class, false); + + private final String encodedRegionName; + private final byte[] rowKey; + + // Row cache keys should not be evicted on close, since the cache may contain many entries and + // eviction would be slow. Instead, the region’s rowCacheSeqNum is used to generate new keys that + // ignore the existing cache when the region is reopened or bulk-loaded. Review Comment: So when do stale rows in the row cache get evicted? And if we don't evict rows from cache for a closed region, would these be wasting cache space until cache is full and LFU logic finally finds those for eviction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
