This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 760ba067ab25a3dfaefc6a0534c53ea0f5d62672 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 23:41:06 2020 +0530 Handle exceptions --- .../plugin/stream/kinesis/KinesisConsumer.java | 59 +++++++++++++++++----- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 910b9ee..dfd6cda 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -32,12 +32,18 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.FetchResult; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.KinesisException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; //TODO: Handle exceptions and timeout @@ -46,6 +52,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume Integer _maxRecords; String _shardId; ExecutorService _executorService; + private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); @@ -58,13 +65,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { - Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() { - @Override - public KinesisFetchResult call() - throws Exception { - return getResult(start, end); - } - }); + Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end)); try { return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); @@ -74,13 +75,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) { + List<Record> recordList = new ArrayList<>(); + KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; + try { - KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; String shardIterator = getShardIterator(kinesisStartCheckpoint); - List<Record> recordList = new ArrayList<>(); - String kinesisEndSequenceNumber = null; if (end != null) { @@ -119,8 +120,42 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); return kinesisFetchResult; - }catch (KinesisException e){ - return null; + }catch (ProvisionedThroughputExceededException e) { + LOG.warn( + "The request rate for the stream is too high" + , e); + return handleException(kinesisStartCheckpoint, recordList); + } + catch (ExpiredIteratorException e) { + LOG.warn( + "ShardIterator expired while trying to fetch records",e + ); + return handleException(kinesisStartCheckpoint, recordList); + } + catch (ResourceNotFoundException | InvalidArgumentException e) { + // aws errors + LOG.error("Encountered AWS error while attempting to fetch records", e); + return handleException(kinesisStartCheckpoint, recordList); + } + catch (KinesisException e) { + LOG.warn("Encountered unknown unrecoverable AWS exception", e); + throw new RuntimeException(e); + } + catch (Throwable e) { + // non transient errors + LOG.error("Unknown fetchRecords exception", e); + throw new RuntimeException(e); + } + } + + private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) { + if(recordList.size() > 0){ + String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); + return new KinesisFetchResult(kinesisCheckpoint, recordList); + }else{ + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber()); + return new KinesisFetchResult(kinesisCheckpoint, recordList); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org