KKcorps commented on code in PR #12806: URL: https://github.com/apache/pinot/pull/12806#discussion_r1584605569
########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -69,134 +61,77 @@ public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) { super(config, kinesisClient); } - /** - * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint - */ @Override - public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { + public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; - List<BytesStreamMessage> messages = new ArrayList<>(); - Future<KinesisMessageBatch> kinesisFetchResultFuture = - _executorService.submit(() -> getResult(startOffset, messages)); - try { - return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - kinesisFetchResultFuture.cancel(true); - } catch (Exception e) { - // Ignored - } - return buildKinesisMessageBatch(startOffset, messages, false); - } - - private KinesisMessageBatch getResult(KinesisPartitionGroupOffset startOffset, List<BytesStreamMessage> messages) { - try { - String shardId = startOffset.getShardId(); - String shardIterator = getShardIterator(shardId, startOffset.getSequenceNumber()); - boolean endOfShard = false; - long currentWindow = System.currentTimeMillis() / SLEEP_TIME_BETWEEN_REQUESTS; - int currentWindowRequests = 0; - while (shardIterator != null) { - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); - long requestSentTime = System.currentTimeMillis() / 1000; - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - List<Record> records = getRecordsResponse.records(); - if (!records.isEmpty()) { - for (Record record : records) { - messages.add(extractStreamMessage(record, shardId)); - } - if (messages.size() >= _config.getNumMaxRecordsToFetch()) { - break; - } - } - - if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) { - //This statement returns true only when end of current shard has reached. - // hasChildShards only checks if the childShard is null and is a valid instance. - endOfShard = true; - break; - } - - shardIterator = getRecordsResponse.nextShardIterator(); - - if (Thread.interrupted()) { - break; - } - - // Kinesis enforces a limit of 5 .getRecords request per second on each shard from AWS end - // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion - if (requestSentTime == currentWindow) { - currentWindowRequests++; - } else if (requestSentTime > currentWindow) { - currentWindow = requestSentTime; - currentWindowRequests = 0; - } - - if (currentWindowRequests >= _config.getNumMaxRecordsToFetch()) { - try { - Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS); - } catch (InterruptedException e) { - LOGGER.debug("Sleep interrupted while rate limiting Kinesis requests", e); - break; - } - } - } - - return buildKinesisMessageBatch(startOffset, messages, endOfShard); - } catch (IllegalStateException e) { - debugOrLogWarning("Illegal state exception, connection is broken", e); - } catch (ProvisionedThroughputExceededException e) { - debugOrLogWarning("The request rate for the stream is too high", e); - } catch (ExpiredIteratorException e) { - debugOrLogWarning("ShardIterator expired while trying to fetch records", e); - } catch (ResourceNotFoundException | InvalidArgumentException e) { - // aws errors - LOGGER.error("Encountered AWS error while attempting to fetch records", e); - } catch (KinesisException e) { - debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e); - throw new RuntimeException(e); - } catch (AbortedException e) { - if (!(e.getCause() instanceof InterruptedException)) { - debugOrLogWarning("Task aborted due to exception", e); - } - } catch (Throwable e) { - // non transient errors - LOGGER.error("Unknown fetchRecords exception", e); - throw new RuntimeException(e); - } - return buildKinesisMessageBatch(startOffset, messages, false); - } + String shardId = startOffset.getShardId(); + String startSequenceNumber = startOffset.getSequenceNumber(); - private void debugOrLogWarning(String message, Throwable throwable) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(message, throwable); + // Get the shard iterator + String shardIterator; + if (startSequenceNumber.equals(_nextStartSequenceNumber)) { + shardIterator = _nextShardIterator; Review Comment: will need to handle a case here when nextShardIterator has expired (since it has time limit of 5 minutes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org