This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 718f41f411 Don't throw exception if partition count can't be fetched (#9249) 718f41f411 is described below commit 718f41f4110998e7cb6bc60ad449711c722568c2 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Sat Aug 20 09:42:02 2022 -0700 Don't throw exception if partition count can't be fetched (#9249) --- .../realtime/RealtimeConsumptionRateManager.java | 49 +++++++++++++++++---- .../RealtimeConsumptionRateManagerTest.java | 51 ++++++++++++++++++++-- 2 files changed, 88 insertions(+), 12 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java index fb9fb410f1..8a4dc9b40c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java @@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -55,7 +57,8 @@ public class RealtimeConsumptionRateManager { } private static class InstanceHolder { - private static final RealtimeConsumptionRateManager INSTANCE = new RealtimeConsumptionRateManager(buildCache()); + private static final RealtimeConsumptionRateManager INSTANCE = new RealtimeConsumptionRateManager( + buildCache(DEFAULT_PARTITION_COUNT_FETCHER, CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES, TimeUnit.MINUTES)); } public static RealtimeConsumptionRateManager getInstance() { @@ -85,17 +88,27 @@ public class RealtimeConsumptionRateManager { return new RateLimiterImpl(partitionRateLimit); } - private static LoadingCache<StreamConfig, Integer> buildCache() { - return CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES, TimeUnit.MINUTES) + @VisibleForTesting + static LoadingCache<StreamConfig, Integer> buildCache(PartitionCountFetcher partitionCountFetcher, + long duration, TimeUnit unit) { + return CacheBuilder.newBuilder().refreshAfterWrite(duration, unit) .build(new CacheLoader<StreamConfig, Integer>() { @Override - public Integer load(StreamConfig streamConfig) + public Integer load(StreamConfig key) throws Exception { - String clientId = streamConfig.getTopicName() + "-consumption.rate.manager"; - StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfig); - try (StreamMetadataProvider streamMetadataProvider = factory.createStreamMetadataProvider(clientId)) { - return streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000); - } + // this method is called the first time cache is used for the given streamConfig + Integer count = partitionCountFetcher.fetch(key); + // if the count cannot be fetched, don't throw exception; return 1. + // The overall consumption rate will be higher, but we prefer that over not consuming at all. + return count != null ? count : 1; + } + + @Override + public ListenableFuture<Integer> reload(StreamConfig key, Integer oldValue) + throws Exception { + // if partition count fetcher cannot fetch the value, old value is returned + Integer count = partitionCountFetcher.fetch(key); + return Futures.immediateFuture(count != null ? count : oldValue); } }); } @@ -131,4 +144,22 @@ public class RealtimeConsumptionRateManager { return _rate; } } + + @VisibleForTesting + @FunctionalInterface + interface PartitionCountFetcher { + Integer fetch(StreamConfig streamConfig); + } + + @VisibleForTesting + static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER = streamConfig -> { + String clientId = streamConfig.getTopicName() + "-consumption.rate.manager"; + StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfig); + try (StreamMetadataProvider streamMetadataProvider = factory.createStreamMetadataProvider(clientId)) { + return streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000); + } catch (Exception e) { + LOGGER.warn("Error fetching metadata for topic " + streamConfig.getTopicName(), e); + return null; + } + }; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java index 47eddc5d1a..cb795f7213 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java @@ -21,13 +21,14 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.cache.LoadingCache; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.stream.StreamConfig; import org.testng.annotations.Test; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl; +import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -71,4 +72,48 @@ public class RealtimeConsumptionRateManagerTest { rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C, TABLE_NAME); assertEquals(rateLimiter, NOOP_RATE_LIMITER); } + + @Test + public void testBuildCache() throws Exception { + PartitionCountFetcher partitionCountFetcher = mock(PartitionCountFetcher.class); + LoadingCache<StreamConfig, Integer> cache = buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS); + when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10); + when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(20); + assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // call fetcher in load method + assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // call fetcher in load method + assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache + verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_A); // count changes + verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_B); // count changes + when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(11); + when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(21); + assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache + Thread.sleep(550); // wait till cache expires + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in reload method + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // call fetcher in reload method + assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // use cache + verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_A); + verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_B); + when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(null); // unsuccessful fetch + when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(22); + Thread.sleep(550); // wait till cache expires + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in reload method + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // call fetcher in reload method + assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // use cache + verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_A); + verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_B); + + // unsuccessful fetch in the first call for config C + when(partitionCountFetcher.fetch(STREAM_CONFIG_C)).thenReturn(null); // unsuccessful fetch + assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // call fetcher in load method + assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache + assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache + verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org