This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8a6477c2 Reduce Kinesis default rate limit to 1 to account for 
replication (#13649)
dd8a6477c2 is described below

commit dd8a6477c22e761a2a6dc3264b9c2a86c94427fd
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed Jul 31 12:47:24 2024 +0530

    Reduce Kinesis default rate limit to 1 to account for replication (#13649)
    
    * Use debug logs in case we run into rate limit exceeded exception
    
    * lower kinesis rate limit
    
    * Fix exception
    
    ---------
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../apache/pinot/plugin/stream/kinesis/KinesisConfig.java  |  8 +++++++-
 .../pinot/plugin/stream/kinesis/KinesisConsumer.java       | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 529e218e90..6f84407006 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -70,7 +70,13 @@ public class KinesisConfig {
   public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
   public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
   public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
-  public static final String DEFAULT_RPS_LIMIT = "5";
+
+  // Kinesis has a default limit of 5 getRecord requests per second per shard.
+  // This limit is enforced by Kinesis and is not configurable.
+  // We are setting it to 1 to avoid hitting the limit  in a replicated setup,
+  // where multiple replicas are fetching from the same shard.
+  // see - 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+  public static final String DEFAULT_RPS_LIMIT = "1";
 
   private final String _streamTopicName;
   private final String _awsRegion;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index e7bb76797a..d90b1b61bb 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -63,7 +63,17 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
 
   @Override
   public synchronized KinesisMessageBatch 
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
-    KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) 
startMsgOffset;
+    try {
+      return getKinesisMessageBatch((KinesisPartitionGroupOffset) 
startMsgOffset);
+    } catch (ProvisionedThroughputExceededException pte) {
+      LOGGER.error("Rate limit exceeded while fetching messages from Kinesis 
stream: {} with threshold: {}",
+          pte.getMessage(), _config.getRpsLimit());
+      return new KinesisMessageBatch(List.of(), (KinesisPartitionGroupOffset) 
startMsgOffset, false);
+    }
+  }
+
+  private KinesisMessageBatch 
getKinesisMessageBatch(KinesisPartitionGroupOffset startMsgOffset) {
+    KinesisPartitionGroupOffset startOffset = startMsgOffset;
     String shardId = startOffset.getShardId();
     String startSequenceNumber = startOffset.getSequenceNumber();
     // Get the shard iterator
@@ -122,7 +132,7 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
         }
-        _currentSecond++;
+        _currentSecond = (int) 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
         _numRequestsInCurrentSecond = 1;
       } else {
         _numRequestsInCurrentSecond++;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to