This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ced6bc282e Use higher fetch timeout for Kinesis (#12214)
ced6bc282e is described below

commit ced6bc282ea9049f45f59f99738e5f1132a03a18
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Mon Jan 22 22:16:02 2024 +0530

    Use higher fetch timeout for Kinesis (#12214)
    
    * Use higher fetch timeout for Kinesis
    
    * Add todo
    
    * Add test
    
    ---------
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.tail8a064.ts.net>
---
 .../core/realtime/stream/StreamConfigTest.java     | 27 ++++++++++++++++++++++
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  7 +++++-
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 333eecab04..11c7ee2010 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -403,4 +403,31 @@ public class StreamConfigTest {
       // expected
     }
   }
+
+  @Test
+  public void testKinesisFetchTimeout() {
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
+    String tableName = "fakeTable_REALTIME";
+    String consumerFactoryClass = "KinesisConsumerFactory";
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
+
+    String consumerType = "simple";
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    StreamConfig streamConfig = new StreamConfig(tableName, streamConfigMap);
+
+    assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 94f8adf566..ea24f5d01b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -48,6 +48,7 @@ public class StreamConfig {
 
   public static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000;
   public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
+  public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS = 
600_000;
   public static final int DEFAULT_IDLE_TIMEOUT_MILLIS = 3 * 60 * 1000;
 
   private static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
@@ -142,7 +143,11 @@ public class StreamConfig {
     }
     _connectionTimeoutMillis = connectionTimeoutMillis;
 
-    int fetchTimeoutMillis = DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS;
+    // For Kinesis, we need to set a higher fetch timeout to avoid getting 
stuck in empty records loop
+    // TODO: Remove this once we have a better way to handle empty records in 
Kinesis
+    int fetchTimeoutMillis =
+        _consumerFactoryClassName.contains("KinesisConsumerFactory") ? 
DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS
+            : DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS;
     String fetchTimeoutKey =
         StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS);
     String fetchTimeoutValue = streamConfigMap.get(fetchTimeoutKey);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to