This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c24f1cc  Enhance Pinot streaming consumer interface (#8047)
c24f1cc is described below

commit c24f1cc9f712bd7e8c804cd50198af450b95b0dc
Author: Navina Ramesh <navi.trin...@gmail.com>
AuthorDate: Fri Jan 21 09:22:12 2022 -0800

    Enhance Pinot streaming consumer interface (#8047)
    
    * Introduces new interface methods - start, commit and rollback in 
PartitionGroupConsumer;
    
    bumping up grpc version to 1.41.0
    
    removed unnecessary final class members; docs for PartitionGroupConsumer 
APIs
    
    * incorporating changes from OSS discussions
    
    * Addressing comments from PR
    
    * bumping up presto-pinot-driver to grcp 1.40.0 version
    
    Co-authored-by: Navina Ramesh <nav...@apache.org>
---
 pinot-connectors/presto-pinot-driver/pom.xml       |  2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     | 15 ++++----
 .../pinot/spi/stream/PartitionGroupConsumer.java   | 40 ++++++++++++++++++++--
 .../pinot/spi/stream/StreamDataProducer.java       |  2 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  6 ++--
 .../pinot/tools/streams/MeetupRsvpJsonStream.java  |  4 +--
 .../pinot/tools/streams/MeetupRsvpStream.java      | 12 +++++--
 pom.xml                                            |  2 +-
 8 files changed, 64 insertions(+), 19 deletions(-)

diff --git a/pinot-connectors/presto-pinot-driver/pom.xml 
b/pinot-connectors/presto-pinot-driver/pom.xml
index 3a2f35e..05d4490 100644
--- a/pinot-connectors/presto-pinot-driver/pom.xml
+++ b/pinot-connectors/presto-pinot-driver/pom.xml
@@ -536,7 +536,7 @@
         <dependency>
             <groupId>io.grpc</groupId>
             <artifactId>grpc-api</artifactId>
-            <version>1.30.0</version>
+            <version>1.41.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.guava</groupId>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 5233ff2..378b0e8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -251,7 +251,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31;
 
   private Thread _consumerThread;
-  private final String _streamTopic;
   private final int _partitionGroupId;
   private final PartitionGroupConsumptionStatus 
_partitionGroupConsumptionStatus;
   final String _clientId;
@@ -664,6 +663,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               break;
             case COMMIT:
               _state = State.COMMITTING;
+              _currentOffset = 
_partitionGroupConsumer.checkpoint(_currentOffset);
               long buildTimeSeconds = response.getBuildTimeSeconds();
               buildSegmentForCommit(buildTimeSeconds * 1000L);
               if (_segmentBuildDescriptor == null) {
@@ -1232,7 +1232,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
     _streamPartitionMsgOffsetFactory =
         
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
-    _streamTopic = _partitionLevelStreamConfig.getTopicName();
+    String streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
@@ -1244,9 +1244,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
-    _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + 
_partitionGroupId;
+    _metricKeyName = _tableNameWithType + "-" + streamTopic + "-" + 
_partitionGroupId;
     _segmentLogger = 
LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + 
_segmentNameStr);
-    _tableStreamName = _tableNameWithType + "_" + _streamTopic;
+    _tableStreamName = _tableNameWithType + "_" + streamTopic;
     _memoryManager = 
getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
         indexLoadingConfig.isRealtimeOffHeapAllocation(), 
indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
         serverMetrics);
@@ -1307,7 +1307,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
         new 
RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
-            
.setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
+            
.setStreamName(streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
             
.setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
@@ -1325,7 +1325,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // Create message decoder
     Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), 
_schema);
     _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _clientId = _streamTopic + "-" + _partitionGroupId;
+    _clientId = streamTopic + "-" + _partitionGroupId;
 
     // Create record transformer
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
@@ -1465,9 +1465,10 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     if (_partitionGroupConsumer != null) {
       closePartitionGroupConsumer();
     }
-    _segmentLogger.info("Creating new stream consumer, reason: {}", reason);
+    _segmentLogger.info("Creating new stream consumer for topic partition {} , 
reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, 
_partitionGroupConsumptionStatus);
+    
_partitionGroupConsumer.start(_partitionGroupConsumptionStatus.getStartOffset());
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 735c91d..8e9308e 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -21,14 +21,33 @@ package org.apache.pinot.spi.stream;
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
 
-
 /**
  * Consumer interface for consuming from a partition group of a stream
  */
 public interface PartitionGroupConsumer extends Closeable {
+  /**
+   * Starts a stream consumer
+   *
+   * This is useful in cases where starting the consumer involves preparing / 
initializing the source.
+   * A typical example is that of an asynchronous / non-poll based consumption 
model where this method will be used to
+   * setup or initialize the consumer to fetch messages from the source stream.
+   *
+   * Poll-based consumers can optionally use this to prefetch metadata from 
the source.
+   *
+   * This method should be invoked by the caller before trying to invoke
+   * {@link #fetchMessages(StreamPartitionMsgOffset, StreamPartitionMsgOffset, 
int)}.
+   *
+   * @param startOffset Offset (inclusive) at which the consumption should 
begin
+   */
+  default void start(StreamPartitionMsgOffset startOffset) {
+
+  }
 
   /**
-   * Fetch messages and offsets from the stream partition group
+   * Return messages from the stream partition group within the specified 
timeout
+   *
+   * The message may be fetched by actively polling the source or by 
retrieving from a pre-fetched buffer. This depends
+   * on the implementation.
    *
    * @param startOffset The offset of the first message desired, inclusive
    * @param endOffset The offset of the last message desired, exclusive, or 
null
@@ -39,4 +58,21 @@ public interface PartitionGroupConsumer extends Closeable {
    */
   MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset endOffset, int timeoutMs)
       throws TimeoutException;
+
+  /**
+   * Checkpoints the consumption state of the stream partition group in the 
source
+   *
+   * This is useful in streaming systems that require preserving consumption 
state on the source in order to resume or
+   * replay consumption of data. The consumer implementation is responsible 
for managing this state.
+   *
+   * The offset returned will be used for offset comparisons within the local 
server (say, for catching up) and also,
+   * persisted to the ZK segment metadata. Hence, the returned value should be 
same or equivalent to the lastOffset
+   * provided as input (that is, compareTo of the input and returned offset 
should be 0).
+   *
+   * @param lastOffset checkpoint the stream at this offset (exclusive)
+   * @return Returns the offset that should be used as the next upcoming 
offset for the stream partition group
+   */
+  default StreamPartitionMsgOffset checkpoint(StreamPartitionMsgOffset 
lastOffset) {
+    return lastOffset;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
index ed19f86..6b3c140 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 
 
 /**
- * StreamDataServerStartable is the interface for stream data sources. E.g. 
KafkaServerStartable, KinesisServerStarable.
+ * StreamDataProducer is the interface for stream data sources. E.g. 
KafkaDataProducer.
  */
 public interface StreamDataProducer {
   void init(Properties props);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0947b2f..4a6f3ff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -487,9 +487,9 @@ public class CommonConstants {
        */
       public enum CompletionMode {
         // default behavior - if the in memory segment in the non-winner 
server is equivalent to the committed
-        // segment, then build and
-        // replace, else download
-        DEFAULT, // non-winner servers always download the segment, never 
build it
+        // segment, then build and replace, else download
+        DEFAULT,
+        // non-winner servers always download the segment, never build it
         DOWNLOAD
       }
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
index 624d277..586bfa7 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
@@ -45,12 +45,12 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream {
           try {
             JsonNode messageJson = JsonUtils.stringToJsonNode(message);
             String rsvpId = messageJson.get("rsvp_id").asText();
-            _producer.produce("meetupRSVPEvents", rsvpId.getBytes(UTF_8), 
message.getBytes(UTF_8));
+            _producer.produce(_topicName, rsvpId.getBytes(UTF_8), 
message.getBytes(UTF_8));
           } catch (Exception e) {
             LOGGER.error("Caught exception while processing the message: {}", 
message, e);
           }
         } else {
-          _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8));
+          _producer.produce(_topicName, message.getBytes(UTF_8));
         }
       }
     };
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index fd37117..5aa8abe 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -40,6 +40,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class MeetupRsvpStream {
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(MeetupRsvpStream.class);
+  private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
+  protected String _topicName = DEFAULT_TOPIC_NAME;
 
   protected final boolean _partitionByKey;
   protected final StreamDataProducer _producer;
@@ -63,6 +65,12 @@ public class MeetupRsvpStream {
     _producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
   }
 
+  public MeetupRsvpStream(boolean partitionByKey, StreamDataProducer producer, 
String topicName) {
+    _partitionByKey = partitionByKey;
+    _producer = producer;
+    _topicName = topicName;
+  }
+
   public void run()
       throws Exception {
     _client = ClientManager.createClient();
@@ -117,10 +125,10 @@ public class MeetupRsvpStream {
 
         if (_keepPublishing) {
           if (_partitionByKey) {
-            _producer.produce("meetupRSVPEvents", eventId.getBytes(UTF_8),
+            _producer.produce(_topicName, eventId.getBytes(UTF_8),
                 extractedJson.toString().getBytes(UTF_8));
           } else {
-            _producer.produce("meetupRSVPEvents", 
extractedJson.toString().getBytes(UTF_8));
+            _producer.produce(_topicName, 
extractedJson.toString().getBytes(UTF_8));
           }
         }
       } catch (Exception e) {
diff --git a/pom.xml b/pom.xml
index 426936c..1d38cf3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,7 +164,7 @@
     TODO: figure out a way to inject kafka dependency instead of explicitly 
setting the kafka module dependency -->
     <kafka.version>2.0</kafka.version>
     <protobuf.version>3.12.0</protobuf.version>
-    <grpc.version>1.30.0</grpc.version>
+    <grpc.version>1.41.0</grpc.version>
 
     <!-- Checkstyle violation prop.-->
     <checkstyle.violation.severity>warning</checkstyle.violation.severity>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to