This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 31c64a0cc138146dc59c1ce665f3ca72fd7b52f9
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Thu Dec 31 17:19:24 2020 -0800

    An attempt at server-side changes
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 22 +++++++++++++---------
 .../org/apache/pinot/spi/stream/FetchResult.java   |  5 +----
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  2 +-
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0938251..054676e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,8 +71,10 @@ import 
org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.FetchResult;
 import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.RowMetadata;
@@ -249,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private Thread _consumerThread;
   private final String _streamTopic;
   private final int _partitionGroupId;
+  private final PartitionGroupMetadata _partitionGroupMetadata;
   final String _clientId;
   private final LLCSegmentName _llcSegmentName;
   private final RecordTransformer _recordTransformer;
-  private PartitionLevelConsumer _partitionLevelConsumer = null;
+  private PartitionGroupConsumer _partitionGroupConsumer = null;
   private StreamMetadataProvider _streamMetadataProvider = null;
   private final File _resourceTmpDir;
   private final String _tableNameWithType;
@@ -381,12 +384,10 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        messageBatch = _partitionLevelConsumer
+        FetchResult fetchResult = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, 
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+        messageBatch = fetchResult.getMessages();
         consecutiveErrorCount = 0;
-      } catch (TimeoutException e) {
-        handleTransientStreamErrors(e);
-        continue;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
         continue;
@@ -899,7 +900,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
   private void closePartitionLevelConsumer() {
     try {
-      _partitionLevelConsumer.close();
+      _partitionGroupConsumer.close();
     } catch (Exception e) {
       segmentLogger.warn("Could not close stream consumer", e);
     }
@@ -1131,6 +1132,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
+    _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, 
_llcSegmentName.getSequenceNumber(),
+        _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+        _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + 
_partitionGroupId;
@@ -1311,11 +1315,11 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
    * @param reason
    */
   private void makeStreamConsumer(String reason) {
-    if (_partitionLevelConsumer != null) {
+    if (_partitionGroupConsumer != null) {
       closePartitionLevelConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionLevelConsumer = 
_streamConsumerFactory.createPartitionLevelConsumer(_clientId, 
_partitionGroupId);
+    _partitionGroupConsumer = 
_streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
index b0ed6e5..7e8a911 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
-
-
 public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  List<T> getMessages();
+  MessageBatch<T> getMessages();
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index e096e67..bbbdaad 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,5 +22,5 @@ import java.io.Closeable;
 
 
 public interface PartitionGroupConsumer extends Closeable {
-  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+  FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to