This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new c35d4a6 Consumer tweaks to get it working c35d4a6 is described below commit c35d4a603359d8034da92f93a8b6e6e000ea542c Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Mon Jan 4 18:01:52 2021 -0800 Consumer tweaks to get it working --- .../protocols/SegmentCompletionProtocol.java | 19 ++++++++++++---- .../plugin/stream/kinesis/KinesisConsumer.java | 7 ++++-- .../plugin/stream/kinesis/KinesisRecordsBatch.java | 16 ++++++-------- .../org/apache/pinot/spi/stream/MessageBatch.java | 2 ++ .../pinot/spi/stream/PartitionGroupMetadata.java | 25 +++++++++++----------- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 04f300b..dd1330d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.utils.JsonUtils; @@ -180,6 +183,15 @@ public class SegmentCompletionProtocol { } public String getUrl(String hostPort, String protocol) { + String streamPartitionMsgOffset; + try { + streamPartitionMsgOffset = _params.getStreamPartitionMsgOffset() == null ? null : + URLEncoder.encode(_params.getStreamPartitionMsgOffset(), StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException( + "Caught exception when encoding streamPartitionMsgOffset string: " + _params.getStreamPartitionMsgOffset(), + e); + } return protocol + "://" + hostPort + "/" + _msgType + "?" + PARAM_SEGMENT_NAME + "=" + _params.getSegmentName() + "&" + PARAM_OFFSET + "=" + _params.getOffset() + "&" + PARAM_INSTANCE_ID + "=" + _params.getInstanceId() + ( _params.getReason() == null ? "" : ("&" + PARAM_REASON + "=" + _params.getReason())) + ( @@ -190,10 +202,9 @@ public class SegmentCompletionProtocol { + (_params.getSegmentSizeBytes() <= 0 ? "" : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" + _params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? "" : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) + (_params.getSegmentLocation() == null ? "" - : ("&" + PARAM_SEGMENT_LOCATION + "=" + _params.getSegmentLocation())) - + (_params.getStreamPartitionMsgOffset() == null ? "" - : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + _params.getStreamPartitionMsgOffset())) - ; + : ("&" + PARAM_SEGMENT_LOCATION + "=" + _params.getSegmentLocation())) + ( + streamPartitionMsgOffset == null ? "" + : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + streamPartitionMsgOffset)); } public static class Params { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 8ed3de7..a97f3dc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -59,13 +59,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti } @Override - public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) { + public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeoutMs) { List<Record> recordList = new ArrayList<>(); Future<KinesisRecordsBatch> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList)); try { - return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); + return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (Exception e) { return handleException((KinesisCheckpoint) start, recordList); } @@ -127,6 +127,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti } return new KinesisRecordsBatch(recordList, next.getKey()); + } catch (IllegalStateException e) { + LOG.warn("Illegal state exception, connection is broken", e); + return handleException(kinesisStartCheckpoint, recordList); } catch (ProvisionedThroughputExceededException e) { LOG.warn("The request rate for the stream is too high", e); return handleException(kinesisStartCheckpoint, recordList); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index fb4bfb3..fdc883b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.plugin.stream.kinesis; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -28,8 +30,8 @@ import software.amazon.awssdk.services.kinesis.model.Record; public class KinesisRecordsBatch implements MessageBatch<byte[]> { - private List<Record> _recordList; - private String _shardId; + private final List<Record> _recordList; + private final String _shardId; public KinesisRecordsBatch(List<Record> recordList, String shardId) { _recordList = recordList; @@ -43,12 +45,11 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { @Override public byte[] getMessageAtIndex(int index) { - return _recordList.get(index).data().asByteBuffer().array(); + return _recordList.get(index).data().asByteArray(); } - @Override public int getMessageOffsetAtIndex(int index) { - return _recordList.get(index).data().asByteBuffer().arrayOffset(); + return ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset(); } @Override @@ -57,11 +58,6 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { } @Override - public RowMetadata getMetadataAtIndex(int index) { - throw new UnsupportedOperationException(); - } - - @Override public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { Map<String, String> shardToSequenceMap = new HashMap<>(); shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 3052b9e..5af72c0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.stream; +import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -61,6 +62,7 @@ public interface MessageBatch<T> { * Returns the metadata associated with the message at a particular index. This typically includes the timestamp * when the message was ingested by the upstream stream-provider and other relevant metadata. */ + @Nullable default RowMetadata getMetadataAtIndex(int index) { return null; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index 7c4e3ef..aaf20b6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -36,18 +36,7 @@ public class PartitionGroupMetadata { _sequenceNumber = sequenceNumber; _startCheckpoint = startCheckpoint; _endCheckpoint = endCheckpoint; - } - - public void setSequenceNumber(int sequenceNumber) { - _sequenceNumber = sequenceNumber; - } - - public void setStartCheckpoint(String startCheckpoint) { - _startCheckpoint = startCheckpoint; - } - - public void setEndCheckpoint(String endCheckpoint) { - _endCheckpoint = endCheckpoint; + _status = status; } public int getPartitionGroupId() { @@ -58,14 +47,26 @@ public class PartitionGroupMetadata { return _sequenceNumber; } + public void setSequenceNumber(int sequenceNumber) { + _sequenceNumber = sequenceNumber; + } + public String getStartCheckpoint() { return _startCheckpoint; } + public void setStartCheckpoint(String startCheckpoint) { + _startCheckpoint = startCheckpoint; + } + public String getEndCheckpoint() { return _endCheckpoint; } + public void setEndCheckpoint(String endCheckpoint) { + _endCheckpoint = endCheckpoint; + } + public String getStatus() { return _status; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org