This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new dd8a6477c2 Reduce Kinesis default rate limit to 1 to account for replication (#13649) dd8a6477c2 is described below commit dd8a6477c22e761a2a6dc3264b9c2a86c94427fd Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jul 31 12:47:24 2024 +0530 Reduce Kinesis default rate limit to 1 to account for replication (#13649) * Use debug logs in case we run into rate limit exceeded exception * lower kinesis rate limit * Fix exception --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../apache/pinot/plugin/stream/kinesis/KinesisConfig.java | 8 +++++++- .../pinot/plugin/stream/kinesis/KinesisConsumer.java | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index 529e218e90..6f84407006 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -70,7 +70,13 @@ public class KinesisConfig { public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false"; public static final String DEFAULT_SESSION_DURATION_SECONDS = "900"; public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true"; - public static final String DEFAULT_RPS_LIMIT = "5"; + + // Kinesis has a default limit of 5 getRecord requests per second per shard. + // This limit is enforced by Kinesis and is not configurable. + // We are setting it to 1 to avoid hitting the limit in a replicated setup, + // where multiple replicas are fetching from the same shard. + // see - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html + public static final String DEFAULT_RPS_LIMIT = "1"; private final String _streamTopicName; private final String _awsRegion; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index e7bb76797a..d90b1b61bb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -63,7 +63,17 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti @Override public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { - KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; + try { + return getKinesisMessageBatch((KinesisPartitionGroupOffset) startMsgOffset); + } catch (ProvisionedThroughputExceededException pte) { + LOGGER.error("Rate limit exceeded while fetching messages from Kinesis stream: {} with threshold: {}", + pte.getMessage(), _config.getRpsLimit()); + return new KinesisMessageBatch(List.of(), (KinesisPartitionGroupOffset) startMsgOffset, false); + } + } + + private KinesisMessageBatch getKinesisMessageBatch(KinesisPartitionGroupOffset startMsgOffset) { + KinesisPartitionGroupOffset startOffset = startMsgOffset; String shardId = startOffset.getShardId(); String startSequenceNumber = startOffset.getSequenceNumber(); // Get the shard iterator @@ -122,7 +132,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti } catch (InterruptedException e) { throw new RuntimeException(e); } - _currentSecond++; + _currentSecond = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); _numRequestsInCurrentSecond = 1; } else { _numRequestsInCurrentSecond++; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org