This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7326d4a  Fixed backward incompatibility for existing stream 
implementations (#5549)
7326d4a is described below

commit 7326d4ad7a7db1d07d760f6ace1b59fa4740a8d0
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Fri Jun 12 10:46:02 2020 -0700

    Fixed backward incompatibility for existing stream implementations (#5549)
    
    PR #5542 introduced incompatibility in the interfaces for existing
    stream implementations (like eventhub or other kafka implementations)
    Added default implementations so that the other streams can still
    compile with pinot.
---
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java  |  3 +--
 .../realtime/impl/fakestream/FakeStreamMessageBatch.java     |  6 +++++-
 .../plugin/stream/kafka09/SimpleConsumerMessageBatch.java    |  7 ++++++-
 .../pinot/plugin/stream/kafka20/KafkaMessageBatch.java       |  7 ++++++-
 .../main/java/org/apache/pinot/spi/stream/MessageBatch.java  | 12 +++++++++++-
 .../org/apache/pinot/spi/stream/StreamMetadataProvider.java  |  7 +++++--
 6 files changed, 34 insertions(+), 8 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 05f9340..6dcef0a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -66,7 +66,6 @@ import 
org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -506,7 +505,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
                 realtimeRowsDroppedMeter);
       }
 
-      _currentOffset = 
messagesAndOffsets.getNextStreamMessageOffsetAtIndex(index);
+      _currentOffset = 
messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index);
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
       _numRowsConsumed++;
       streamMessageCount++;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
index 650d75a..59dd753 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -52,7 +52,11 @@ public class FakeStreamMessageBatch implements 
MessageBatch<byte[]> {
     return _messageBytes.get(index).length;
   }
 
-  public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) 
{
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException("This method is deprecated");
+  }
+
+  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
     return new LongMsgOffset(_messageOffsets.get(index) + 1);
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
index b2125ff..ab333de 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
@@ -51,7 +51,12 @@ public class SimpleConsumerMessageBatch implements 
MessageBatch<byte[]> {
     return messageList.get(index).message().payloadSize();
   }
 
-  public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) 
{
+  @Override
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException("This method is deprecated");
+  }
+
+  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
     return new LongMsgOffset(messageList.get(index).nextOffset());
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index e5678ce..82b25a3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -59,7 +59,12 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
   }
 
   @Override
-  public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) 
{
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException("This method is deprecated");
+  }
+
+  @Override
+  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
     return new LongMsgOffset(messageList.get(index).getNextOffset());
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 68c5c2e..3052b9e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -70,5 +70,15 @@ public interface MessageBatch<T> {
    * @param index
    * @return
    */
-  StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index);
+  @Deprecated
+  long getNextStreamMessageOffsetAtIndex(int index);
+
+  /**
+   * Returns the offset of the next message.
+   * @param index
+   * @return
+   */
+  default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
+    return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 8b8176d..557ffc4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -48,6 +48,9 @@ public interface StreamMetadataProvider extends Closeable {
    * @return
    * @throws java.util.concurrent.TimeoutException
    */
-  StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull OffsetCriteria 
offsetCriteria, long timeoutMillis)
-      throws java.util.concurrent.TimeoutException;
+  default StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull 
OffsetCriteria offsetCriteria, long timeoutMillis)
+      throws java.util.concurrent.TimeoutException {
+    long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
+    return new LongMsgOffset(offset);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to