KKcorps commented on code in PR #12697: URL: https://github.com/apache/pinot/pull/12697#discussion_r1538609473
########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -52,220 +46,112 @@ */ public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class); - public static final long SLEEP_TIME_BETWEEN_REQUESTS = 1000L; - private final String _streamTopicName; - private final int _numMaxRecordsToFetch; - private final ExecutorService _executorService; - private final ShardIteratorType _shardIteratorType; - private final int _rpsLimit; - public KinesisConsumer(KinesisConfig kinesisConfig) { - super(kinesisConfig); - _streamTopicName = kinesisConfig.getStreamTopicName(); - _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch(); - _shardIteratorType = kinesisConfig.getShardIteratorType(); - _rpsLimit = kinesisConfig.getRpsLimit(); - _executorService = Executors.newSingleThreadExecutor(); + private int _currentSecond = 0; + private int _numRequestsInCurrentSecond = 0; + + public KinesisConsumer(KinesisConfig config) { + super(config); + LOGGER.info("Created Kinesis consumer with topic: {}, RPS limit: {}, max records per fetch: {}", + config.getStreamTopicName(), config.getRpsLimit(), config.getNumMaxRecordsToFetch()); } @VisibleForTesting - public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) { - super(kinesisConfig, kinesisClient); - _kinesisClient = kinesisClient; - _streamTopicName = kinesisConfig.getStreamTopicName(); - _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch(); - _shardIteratorType = kinesisConfig.getShardIteratorType(); - _rpsLimit = kinesisConfig.getRpsLimit(); - _executorService = Executors.newSingleThreadExecutor(); + public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) { + super(config, kinesisClient); } /** * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint */ @Override - public KinesisRecordsBatch fetchMessages(StreamPartitionMsgOffset startCheckpoint, - StreamPartitionMsgOffset endCheckpoint, int timeoutMs) { - List<KinesisStreamMessage> recordList = new ArrayList<>(); - Future<KinesisRecordsBatch> kinesisFetchResultFuture = - _executorService.submit(() -> getResult(startCheckpoint, endCheckpoint, recordList)); - - try { - return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - kinesisFetchResultFuture.cancel(true); - return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList); - } catch (Exception e) { - return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList); - } - } - - private KinesisRecordsBatch getResult(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, - List<KinesisStreamMessage> recordList) { - KinesisPartitionGroupOffset kinesisStartCheckpoint = (KinesisPartitionGroupOffset) startOffset; - - try { - if (_kinesisClient == null) { - createConnection(); - } - - // TODO: iterate upon all the shardIds in the map - // Okay for now, since we have assumed that every partition group contains a single shard - Map<String, String> startShardToSequenceMap = kinesisStartCheckpoint.getShardToStartSequenceMap(); - Preconditions.checkState(startShardToSequenceMap.size() == 1, - "Only 1 shard per consumer supported. Found: %s, in startShardToSequenceMap", - startShardToSequenceMap.keySet()); - Map.Entry<String, String> startShardToSequenceNum = startShardToSequenceMap.entrySet().iterator().next(); - String shardIterator = getShardIterator(startShardToSequenceNum.getKey(), startShardToSequenceNum.getValue()); - - String kinesisEndSequenceNumber = null; - - if (endOffset != null) { - KinesisPartitionGroupOffset kinesisEndCheckpoint = (KinesisPartitionGroupOffset) endOffset; - Map<String, String> endShardToSequenceMap = kinesisEndCheckpoint.getShardToStartSequenceMap(); - Preconditions.checkState(endShardToSequenceMap.size() == 1, - "Only 1 shard per consumer supported. Found: %s, in endShardToSequenceMap", endShardToSequenceMap.keySet()); - kinesisEndSequenceNumber = endShardToSequenceMap.values().iterator().next(); - } - - String nextStartSequenceNumber; - boolean isEndOfShard = false; - long currentWindow = System.currentTimeMillis() / SLEEP_TIME_BETWEEN_REQUESTS; - int currentWindowRequests = 0; - while (shardIterator != null) { - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); - - long requestSentTime = System.currentTimeMillis() / 1000; - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - - if (!getRecordsResponse.records().isEmpty()) { - getRecordsResponse.records().forEach(record -> { - recordList.add( - new KinesisStreamMessage(record.partitionKey().getBytes(StandardCharsets.UTF_8), - record.data().asByteArray(), record.sequenceNumber(), - (KinesisStreamMessageMetadata) _kinesisMetadataExtractor.extract(record), - record.data().asByteArray().length)); - }); - nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - - if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(nextStartSequenceNumber) <= 0) { - break; - } - - if (recordList.size() >= _numMaxRecordsToFetch) { - break; - } - } - - if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) { - //This statement returns true only when end of current shard has reached. - // hasChildShards only checks if the childShard is null and is a valid instance. - isEndOfShard = true; - break; - } - - shardIterator = getRecordsResponse.nextShardIterator(); - - if (Thread.interrupted()) { - break; - } - - // Kinesis enforces a limit of 5 .getRecords request per second on each shard from AWS end - // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion - if (requestSentTime == currentWindow) { - currentWindowRequests++; - } else if (requestSentTime > currentWindow) { - currentWindow = requestSentTime; - currentWindowRequests = 0; - } - - if (currentWindowRequests >= _rpsLimit) { - try { - Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS); - } catch (InterruptedException e) { - LOGGER.debug("Sleep interrupted while rate limiting Kinesis requests", e); - break; - } + public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { + KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; + String shardId = startOffset.getShardId(); + String startSequenceNumber = startOffset.getSequenceNumber(); + + // Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end + // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion + long currentTimeMs = System.currentTimeMillis(); + int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs); + if (currentTimeSeconds == _currentSecond) { + if (_numRequestsInCurrentSecond == _config.getRpsLimit()) { + try { + Thread.sleep(1000 - (currentTimeMs % 1000)); Review Comment: Since we are not using a seperate executor now, is it fine to invoke sleep here and stop the consume thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org