This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ced6bc282e Use higher fetch timeout for Kinesis (#12214) ced6bc282e is described below commit ced6bc282ea9049f45f59f99738e5f1132a03a18 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Mon Jan 22 22:16:02 2024 +0530 Use higher fetch timeout for Kinesis (#12214) * Use higher fetch timeout for Kinesis * Add todo * Add test --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> Co-authored-by: Kartik Khare <kharekar...@kartiks-macbook-pro.tail8a064.ts.net> --- .../core/realtime/stream/StreamConfigTest.java | 27 ++++++++++++++++++++++ .../org/apache/pinot/spi/stream/StreamConfig.java | 7 +++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java index 333eecab04..11c7ee2010 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java @@ -403,4 +403,31 @@ public class StreamConfigTest { // expected } } + + @Test + public void testKinesisFetchTimeout() { + String streamType = "fakeStream"; + String topic = "fakeTopic"; + String tableName = "fakeTable_REALTIME"; + String consumerFactoryClass = "KinesisConsumerFactory"; + String decoderClass = FakeStreamMessageDecoder.class.getName(); + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), + decoderClass); + + String consumerType = "simple"; + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES), + consumerType); + StreamConfig streamConfig = new StreamConfig(tableName, streamConfigMap); + + assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 94f8adf566..ea24f5d01b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -48,6 +48,7 @@ public class StreamConfig { public static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000; public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000; + public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS = 600_000; public static final int DEFAULT_IDLE_TIMEOUT_MILLIS = 3 * 60 * 1000; private static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1; @@ -142,7 +143,11 @@ public class StreamConfig { } _connectionTimeoutMillis = connectionTimeoutMillis; - int fetchTimeoutMillis = DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS; + // For Kinesis, we need to set a higher fetch timeout to avoid getting stuck in empty records loop + // TODO: Remove this once we have a better way to handle empty records in Kinesis + int fetchTimeoutMillis = + _consumerFactoryClassName.contains("KinesisConsumerFactory") ? DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS + : DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS; String fetchTimeoutKey = StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS); String fetchTimeoutValue = streamConfigMap.get(fetchTimeoutKey); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org