This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 718f41f411 Don't throw exception if partition count can't be fetched 
(#9249)
718f41f411 is described below

commit 718f41f4110998e7cb6bc60ad449711c722568c2
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Sat Aug 20 09:42:02 2022 -0700

    Don't throw exception if partition count can't be fetched (#9249)
---
 .../realtime/RealtimeConsumptionRateManager.java   | 49 +++++++++++++++++----
 .../RealtimeConsumptionRateManagerTest.java        | 51 ++++++++++++++++++++--
 2 files changed, 88 insertions(+), 12 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index fb9fb410f1..8a4dc9b40c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -55,7 +57,8 @@ public class RealtimeConsumptionRateManager {
   }
 
   private static class InstanceHolder {
-    private static final RealtimeConsumptionRateManager INSTANCE = new 
RealtimeConsumptionRateManager(buildCache());
+    private static final RealtimeConsumptionRateManager INSTANCE = new 
RealtimeConsumptionRateManager(
+        buildCache(DEFAULT_PARTITION_COUNT_FETCHER, 
CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES, TimeUnit.MINUTES));
   }
 
   public static RealtimeConsumptionRateManager getInstance() {
@@ -85,17 +88,27 @@ public class RealtimeConsumptionRateManager {
     return new RateLimiterImpl(partitionRateLimit);
   }
 
-  private static LoadingCache<StreamConfig, Integer> buildCache() {
-    return 
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES,
 TimeUnit.MINUTES)
+  @VisibleForTesting
+  static LoadingCache<StreamConfig, Integer> buildCache(PartitionCountFetcher 
partitionCountFetcher,
+      long duration, TimeUnit unit) {
+    return CacheBuilder.newBuilder().refreshAfterWrite(duration, unit)
         .build(new CacheLoader<StreamConfig, Integer>() {
           @Override
-          public Integer load(StreamConfig streamConfig)
+          public Integer load(StreamConfig key)
               throws Exception {
-            String clientId = streamConfig.getTopicName() + 
"-consumption.rate.manager";
-            StreamConsumerFactory factory = 
StreamConsumerFactoryProvider.create(streamConfig);
-            try (StreamMetadataProvider streamMetadataProvider = 
factory.createStreamMetadataProvider(clientId)) {
-              return 
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
-            }
+            // this method is called the first time cache is used for the 
given streamConfig
+            Integer count = partitionCountFetcher.fetch(key);
+            // if the count cannot be fetched, don't throw exception; return 1.
+            // The overall consumption rate will be higher, but we prefer that 
over not consuming at all.
+            return count != null ? count : 1;
+          }
+
+          @Override
+          public ListenableFuture<Integer> reload(StreamConfig key, Integer 
oldValue)
+              throws Exception {
+            // if partition count fetcher cannot fetch the value, old value is 
returned
+            Integer count = partitionCountFetcher.fetch(key);
+            return Futures.immediateFuture(count != null ? count : oldValue);
           }
         });
   }
@@ -131,4 +144,22 @@ public class RealtimeConsumptionRateManager {
       return _rate;
     }
   }
+
+  @VisibleForTesting
+  @FunctionalInterface
+  interface PartitionCountFetcher {
+    Integer fetch(StreamConfig streamConfig);
+  }
+
+  @VisibleForTesting
+  static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER = 
streamConfig -> {
+    String clientId = streamConfig.getTopicName() + 
"-consumption.rate.manager";
+    StreamConsumerFactory factory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    try (StreamMetadataProvider streamMetadataProvider = 
factory.createStreamMetadataProvider(clientId)) {
+      return 
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
+    } catch (Exception e) {
+      LOGGER.warn("Error fetching metadata for topic " + 
streamConfig.getTopicName(), e);
+      return null;
+    }
+  };
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 47eddc5d1a..cb795f7213 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -21,13 +21,14 @@ package org.apache.pinot.core.data.manager.realtime;
 import com.google.common.cache.LoadingCache;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl;
+import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
@@ -71,4 +72,48 @@ public class RealtimeConsumptionRateManagerTest {
     rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C, 
TABLE_NAME);
     assertEquals(rateLimiter, NOOP_RATE_LIMITER);
   }
+
+  @Test
+  public void testBuildCache() throws Exception {
+    PartitionCountFetcher partitionCountFetcher = 
mock(PartitionCountFetcher.class);
+    LoadingCache<StreamConfig, Integer> cache = 
buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS);
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10);
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(20);
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // call fetcher in 
load method
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // call fetcher in 
load method
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache
+    verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_A); // count 
changes
+    verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_B); // count 
changes
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(11);
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(21);
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache
+    Thread.sleep(550); // wait till cache expires
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in 
reload method
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // call fetcher in 
reload method
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // use cache
+    verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_A);
+    verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_B);
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(null); // 
unsuccessful fetch
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(22);
+    Thread.sleep(550); // wait till cache expires
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in 
reload method
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // call fetcher in 
reload method
+    assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // use cache
+    verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_A);
+    verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_B);
+
+    // unsuccessful fetch in the first call for config C
+    when(partitionCountFetcher.fetch(STREAM_CONFIG_C)).thenReturn(null); // 
unsuccessful fetch
+    assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // call fetcher in load 
method
+    assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
+    assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
+    verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to