This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 31c64a0cc138146dc59c1ce665f3ca72fd7b52f9 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Dec 31 17:19:24 2020 -0800 An attempt at server-side changes --- .../realtime/LLRealtimeSegmentDataManager.java | 22 +++++++++++++--------- .../org/apache/pinot/spi/stream/FetchResult.java | 5 +---- .../pinot/spi/stream/PartitionGroupConsumer.java | 2 +- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 0938251..054676e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -71,8 +71,10 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.FetchResult; import org.apache.pinot.spi.stream.MessageBatch; -import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.PermanentConsumerException; import org.apache.pinot.spi.stream.RowMetadata; @@ -249,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private Thread _consumerThread; private final String _streamTopic; private final int _partitionGroupId; + private final PartitionGroupMetadata _partitionGroupMetadata; final String _clientId; private final LLCSegmentName _llcSegmentName; private final RecordTransformer _recordTransformer; - private PartitionLevelConsumer _partitionLevelConsumer = null; + private PartitionGroupConsumer _partitionGroupConsumer = null; private StreamMetadataProvider _streamMetadataProvider = null; private final File _resourceTmpDir; private final String _tableNameWithType; @@ -381,12 +384,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Update _currentOffset upon return from this method MessageBatch messageBatch; try { - messageBatch = _partitionLevelConsumer + FetchResult fetchResult = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + messageBatch = fetchResult.getMessages(); consecutiveErrorCount = 0; - } catch (TimeoutException e) { - handleTransientStreamErrors(e); - continue; } catch (TransientConsumerException e) { handleTransientStreamErrors(e); continue; @@ -899,7 +900,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private void closePartitionLevelConsumer() { try { - _partitionLevelConsumer.close(); + _partitionGroupConsumer.close(); } catch (Exception e) { segmentLogger.warn("Could not close stream consumer", e); } @@ -1131,6 +1132,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; _partitionGroupId = _llcSegmentName.getPartitionGroupId(); + _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(), + _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(), + _segmentZKMetadata.getStatus().toString()); _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId; @@ -1311,11 +1315,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { * @param reason */ private void makeStreamConsumer(String reason) { - if (_partitionLevelConsumer != null) { + if (_partitionGroupConsumer != null) { closePartitionLevelConsumer(); } segmentLogger.info("Creating new stream consumer, reason: {}", reason); - _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId); + _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java index b0ed6e5..7e8a911 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java @@ -18,10 +18,7 @@ */ package org.apache.pinot.spi.stream; -import java.util.List; - - public interface FetchResult<T> { Checkpoint getLastCheckpoint(); - List<T> getMessages(); + MessageBatch<T> getMessages(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index e096e67..bbbdaad 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -22,5 +22,5 @@ import java.io.Closeable; public interface PartitionGroupConsumer extends Closeable { - FetchResult fetch(Checkpoint start, Checkpoint end, long timeout); + FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org