This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6c8af2b54c72407bfa8f91b2403dc317b01cc8cb Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 01:27:05 2020 +0530 Reformat code --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 3 +-- .../stream/kinesis/KinesisConnectionHandler.java | 14 ++++++----- .../plugin/stream/kinesis/KinesisConsumer.java | 28 ++++++++++++---------- .../stream/kinesis/KinesisConsumerFactory.java | 3 ++- .../plugin/stream/kinesis/KinesisFetchResult.java | 6 ++--- .../kinesis/KinesisPartitionGroupMetadataMap.java | 5 ++-- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index aa80b17..89043ea 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -6,7 +6,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; public class KinesisCheckpoint implements Checkpoint { String _sequenceNumber; - public KinesisCheckpoint(String sequenceNumber){ + public KinesisCheckpoint(String sequenceNumber) { _sequenceNumber = sequenceNumber; } @@ -24,5 +24,4 @@ public class KinesisCheckpoint implements Checkpoint { //TODO: Implement SerDe return new KinesisCheckpoint(new String(blob)); } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index d8888fa..554cca6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -24,19 +24,21 @@ public class KinesisConnectionHandler { private String _awsRegion; KinesisClient _kinesisClient; - public KinesisConnectionHandler(){ + public KinesisConnectionHandler() { } - public KinesisConnectionHandler(String stream, String awsRegion){ + public KinesisConnectionHandler(String stream, String awsRegion) { _stream = stream; _awsRegion = awsRegion; - _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build(); + _kinesisClient = + KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) + .build(); } - public List<Shard> getShards(){ - ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build()); + public List<Shard> getShards() { + ListShardsResponse listShardsResponse = + _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build()); return listShardsResponse.shards(); } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index d896d67..1181d14 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -39,7 +39,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume String kinesisEndSequenceNumber = null; - if(end != null) { + if (end != null) { KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); } @@ -47,32 +47,34 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume String nextStartSequenceNumber = null; Long startTimestamp = System.currentTimeMillis(); - while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){ + while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) { GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - if(getRecordsResponse.records().size() > 0){ + if (getRecordsResponse.records().size() > 0) { recordList.addAll(getRecordsResponse.records()); nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){ + if (kinesisEndSequenceNumber != null + && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { nextStartSequenceNumber = kinesisEndSequenceNumber; break; } - if(recordList.size() >= _maxRecords) break; + if (recordList.size() >= _maxRecords) { + break; + } } shardIterator = getRecordsResponse.nextShardIterator(); } - if(nextStartSequenceNumber == null && recordList.size() > 0){ + if (nextStartSequenceNumber == null && recordList.size() > 0) { nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); - KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, - recordList); + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); return kinesisFetchResult; } @@ -80,14 +82,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { GetShardIteratorResponse getShardIteratorResponse; - if(kinesisStartCheckpoint.getSequenceNumber() != null) { + if (kinesisStartCheckpoint.getSequenceNumber() != null) { String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) + .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .startingSequenceNumber(kinesisStartSequenceNumber).build()); - } else{ + } else { getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); + GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) + .shardIteratorType(ShardIteratorType.LATEST).build()); } return getShardIteratorResponse.shardIterator(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index 0608118..5e06a01 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -21,7 +21,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public PartitionGroupMetadataMap getPartitionGroupsMetadata( PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1")); + return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), + _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global")); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index 2996b28..2801a09 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -11,9 +11,9 @@ public class KinesisFetchResult implements FetchResult<Record> { private final KinesisCheckpoint _kinesisCheckpoint; private final List<Record> _recordList; - public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){ - _kinesisCheckpoint = kinesisCheckpoint; - _recordList = recordList; + public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) { + _kinesisCheckpoint = kinesisCheckpoint; + _recordList = recordList; } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index 700ec3f..05d95de 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -12,10 +12,10 @@ import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap { private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>(); - public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){ + public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) { super(stream, awsRegion); List<Shard> shardList = getShards(); - for(Shard shard : shardList){ + for (Shard shard : shardList) { String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); @@ -34,5 +34,4 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i public PartitionGroupMetadata getPartitionGroupMetadata(int index) { return _stringPartitionGroupMetadataIndex.get(index); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org