This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7326d4a Fixed backward incompatibility for existing stream implementations (#5549) 7326d4a is described below commit 7326d4ad7a7db1d07d760f6ace1b59fa4740a8d0 Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Fri Jun 12 10:46:02 2020 -0700 Fixed backward incompatibility for existing stream implementations (#5549) PR #5542 introduced incompatibility in the interfaces for existing stream implementations (like eventhub or other kafka implementations) Added default implementations so that the other streams can still compile with pinot. --- .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 3 +-- .../realtime/impl/fakestream/FakeStreamMessageBatch.java | 6 +++++- .../plugin/stream/kafka09/SimpleConsumerMessageBatch.java | 7 ++++++- .../pinot/plugin/stream/kafka20/KafkaMessageBatch.java | 7 ++++++- .../main/java/org/apache/pinot/spi/stream/MessageBatch.java | 12 +++++++++++- .../org/apache/pinot/spi/stream/StreamMetadataProvider.java | 7 +++++-- 6 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 05f9340..6dcef0a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -66,7 +66,6 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; @@ -506,7 +505,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { realtimeRowsDroppedMeter); } - _currentOffset = messagesAndOffsets.getNextStreamMessageOffsetAtIndex(index); + _currentOffset = messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index); _numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); _numRowsConsumed++; streamMessageCount++; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java index 650d75a..59dd753 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java @@ -52,7 +52,11 @@ public class FakeStreamMessageBatch implements MessageBatch<byte[]> { return _messageBytes.get(index).length; } - public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) { + public long getNextStreamMessageOffsetAtIndex(int index) { + throw new UnsupportedOperationException("This method is deprecated"); + } + + public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { return new LongMsgOffset(_messageOffsets.get(index) + 1); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java index b2125ff..ab333de 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java @@ -51,7 +51,12 @@ public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> { return messageList.get(index).message().payloadSize(); } - public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) { + @Override + public long getNextStreamMessageOffsetAtIndex(int index) { + throw new UnsupportedOperationException("This method is deprecated"); + } + + public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { return new LongMsgOffset(messageList.get(index).nextOffset()); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index e5678ce..82b25a3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -59,7 +59,12 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { } @Override - public StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index) { + public long getNextStreamMessageOffsetAtIndex(int index) { + throw new UnsupportedOperationException("This method is deprecated"); + } + + @Override + public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { return new LongMsgOffset(messageList.get(index).getNextOffset()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 68c5c2e..3052b9e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -70,5 +70,15 @@ public interface MessageBatch<T> { * @param index * @return */ - StreamPartitionMsgOffset getNextStreamMessageOffsetAtIndex(int index); + @Deprecated + long getNextStreamMessageOffsetAtIndex(int index); + + /** + * Returns the offset of the next message. + * @param index + * @return + */ + default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { + return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index)); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 8b8176d..557ffc4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -48,6 +48,9 @@ public interface StreamMetadataProvider extends Closeable { * @return * @throws java.util.concurrent.TimeoutException */ - StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) - throws java.util.concurrent.TimeoutException; + default StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) + throws java.util.concurrent.TimeoutException { + long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis); + return new LongMsgOffset(offset); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org