This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to 
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new c35d4a6  Consumer tweaks to get it working
c35d4a6 is described below

commit c35d4a603359d8034da92f93a8b6e6e000ea542c
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Mon Jan 4 18:01:52 2021 -0800

    Consumer tweaks to get it working
---
 .../protocols/SegmentCompletionProtocol.java       | 19 ++++++++++++----
 .../plugin/stream/kinesis/KinesisConsumer.java     |  7 ++++--
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 16 ++++++--------
 .../org/apache/pinot/spi/stream/MessageBatch.java  |  2 ++
 .../pinot/spi/stream/PartitionGroupMetadata.java   | 25 +++++++++++-----------
 5 files changed, 41 insertions(+), 28 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 04f300b..dd1330d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -180,6 +183,15 @@ public class SegmentCompletionProtocol {
     }
 
     public String getUrl(String hostPort, String protocol) {
+      String streamPartitionMsgOffset;
+      try {
+        streamPartitionMsgOffset = _params.getStreamPartitionMsgOffset() == 
null ? null :
+            URLEncoder.encode(_params.getStreamPartitionMsgOffset(), 
StandardCharsets.UTF_8.toString());
+      } catch (UnsupportedEncodingException e) {
+        throw new IllegalStateException(
+            "Caught exception when encoding streamPartitionMsgOffset string: " 
+ _params.getStreamPartitionMsgOffset(),
+            e);
+      }
       return protocol + "://" + hostPort + "/" + _msgType + "?" + 
PARAM_SEGMENT_NAME + "=" + _params.getSegmentName()
           + "&" + PARAM_OFFSET + "=" + _params.getOffset() + "&" + 
PARAM_INSTANCE_ID + "=" + _params.getInstanceId() + (
           _params.getReason() == null ? "" : ("&" + PARAM_REASON + "=" + 
_params.getReason())) + (
@@ -190,10 +202,9 @@ public class SegmentCompletionProtocol {
           + (_params.getSegmentSizeBytes() <= 0 ? ""
           : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" + 
_params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? ""
           : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) + 
(_params.getSegmentLocation() == null ? ""
-          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation()))
-          + (_params.getStreamPartitionMsgOffset() == null ? ""
-          : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + 
_params.getStreamPartitionMsgOffset()))
-          ;
+          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation())) + (
+          streamPartitionMsgOffset == null ? ""
+              : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + 
streamPartitionMsgOffset));
     }
 
     public static class Params {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 8ed3de7..a97f3dc 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -59,13 +59,13 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
   }
 
   @Override
-  public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, 
int timeout) {
+  public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, 
int timeoutMs) {
     List<Record> recordList = new ArrayList<>();
     Future<KinesisRecordsBatch> kinesisFetchResultFuture =
         _executorService.submit(() -> getResult(start, end, recordList));
 
     try {
-      return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+      return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
       return handleException((KinesisCheckpoint) start, recordList);
     }
@@ -127,6 +127,9 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
       }
 
       return new KinesisRecordsBatch(recordList, next.getKey());
+    } catch (IllegalStateException e) {
+      LOG.warn("Illegal state exception, connection is broken", e);
+      return handleException(kinesisStartCheckpoint, recordList);
     } catch (ProvisionedThroughputExceededException e) {
       LOG.warn("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index fb4bfb3..fdc883b 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -28,8 +30,8 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 
 
 public class KinesisRecordsBatch implements MessageBatch<byte[]> {
-  private List<Record> _recordList;
-  private String _shardId;
+  private final List<Record> _recordList;
+  private final String _shardId;
 
   public KinesisRecordsBatch(List<Record> recordList, String shardId) {
     _recordList = recordList;
@@ -43,12 +45,11 @@ public class KinesisRecordsBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public byte[] getMessageAtIndex(int index) {
-    return _recordList.get(index).data().asByteBuffer().array();
+    return _recordList.get(index).data().asByteArray();
   }
-
   @Override
   public int getMessageOffsetAtIndex(int index) {
-    return _recordList.get(index).data().asByteBuffer().arrayOffset();
+    return 
ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset();
   }
 
   @Override
@@ -57,11 +58,6 @@ public class KinesisRecordsBatch implements 
MessageBatch<byte[]> {
   }
 
   @Override
-  public RowMetadata getMetadataAtIndex(int index) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
     Map<String, String> shardToSequenceMap = new HashMap<>();
     shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..5af72c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 
@@ -61,6 +62,7 @@ public interface MessageBatch<T> {
    * Returns the metadata associated with the message at a particular index. 
This typically includes the timestamp
    * when the message was ingested by the upstream stream-provider and other 
relevant metadata.
    */
+  @Nullable
   default RowMetadata getMetadataAtIndex(int index) {
     return null;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 7c4e3ef..aaf20b6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -36,18 +36,7 @@ public class PartitionGroupMetadata {
     _sequenceNumber = sequenceNumber;
     _startCheckpoint = startCheckpoint;
     _endCheckpoint = endCheckpoint;
-  }
-
-  public void setSequenceNumber(int sequenceNumber) {
-    _sequenceNumber = sequenceNumber;
-  }
-
-  public void setStartCheckpoint(String startCheckpoint) {
-    _startCheckpoint = startCheckpoint;
-  }
-
-  public void setEndCheckpoint(String endCheckpoint) {
-    _endCheckpoint = endCheckpoint;
+    _status = status;
   }
 
   public int getPartitionGroupId() {
@@ -58,14 +47,26 @@ public class PartitionGroupMetadata {
     return _sequenceNumber;
   }
 
+  public void setSequenceNumber(int sequenceNumber) {
+    _sequenceNumber = sequenceNumber;
+  }
+
   public String getStartCheckpoint() {
     return _startCheckpoint;
   }
 
+  public void setStartCheckpoint(String startCheckpoint) {
+    _startCheckpoint = startCheckpoint;
+  }
+
   public String getEndCheckpoint() {
     return _endCheckpoint;
   }
 
+  public void setEndCheckpoint(String endCheckpoint) {
+    _endCheckpoint = endCheckpoint;
+  }
+
   public String getStatus() {
     return _status;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to