This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch pub-sub in repository https://gitbox.apache.org/repos/asf/pinot.git
commit a485eaf0a462a6003d7313d4417a1ad17311b63f Author: kishoreg <g.kish...@gmail.com> AuthorDate: Thu Nov 25 09:14:05 2021 -1000 Interface changes needed to support pub-sub --- .../realtime/LLRealtimeSegmentDataManager.java | 341 ++++++++++++++------- .../pinot/spi/stream/PartitionGroupConsumer.java | 19 +- 2 files changed, 247 insertions(+), 113 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index a36935d..a7dc545 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -94,11 +94,12 @@ import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion. + * Segment data manager for low level consumer realtime segments, which manages consumption and + * segment completion. */ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { + protected enum State { // The state machine starts off with this state. While in this state we consume stream events // and index them in memory. We continue to be in this state until the end criteria is satisfied @@ -140,11 +141,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { ERROR; public boolean shouldConsume() { - return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this.equals(CONSUMING_TO_ONLINE); + return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this + .equals(CONSUMING_TO_ONLINE); } public boolean isFinal() { - return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this.equals(DISCARDED); + return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this + .equals(DISCARDED); } } @@ -152,6 +155,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { @VisibleForTesting public class SegmentBuildDescriptor { + final File _segmentTarFile; final Map<String, File> _metadataFileMap; final StreamPartitionMsgOffset _offset; @@ -159,8 +163,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { final long _buildTimeMillis; final long _segmentSizeBytes; - public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap, - StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) { + public SegmentBuildDescriptor(@Nullable File segmentTarFile, + @Nullable Map<String, File> metadataFileMap, + StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, + long segmentSizeBytes) { _segmentTarFile = segmentTarFile; _metadataFileMap = metadataFileMap; _offset = _streamPartitionMsgOffsetFactory.create(offset); @@ -301,23 +307,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // the max time we are allowed to consume. if (now >= _consumeEndTime) { if (_realtimeSegment.getNumDocsIndexed() == 0) { - _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); + _segmentLogger.info("No events came in, extending time by {} hours", + TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); return false; } _segmentLogger - .info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}", + .info( + "Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}", _startTimeMs, now, _numRowsConsumed, _numRowsIndexed); _stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT; return true; } else if (_numRowsIndexed >= _segmentMaxRowCount) { - _segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}", + _segmentLogger.info( + "Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT; return true; } else if (_endOfPartitionGroup) { - _segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, " - + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); + _segmentLogger.info( + "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, " + + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP; return true; } @@ -334,7 +344,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { return true; } if (_currentOffset.compareTo(_finalOffset) > 0) { - _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset, + _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), + _currentOffset, _finalOffset); throw new RuntimeException("Past max offset"); } @@ -348,11 +359,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString()); return true; } else if (now >= _consumeEndTime) { - _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString()); + _segmentLogger + .info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString()); return true; } if (_currentOffset.compareTo(_finalOffset) > 0) { - _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset, + _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), + _currentOffset, _finalOffset); throw new RuntimeException("Past max offset"); } @@ -367,12 +380,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { throws Exception { _consecutiveErrorCount++; if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) { - _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts", + _segmentLogger.warn( + "Stream transient exception when fetching messages, stopping consumption after {} attempts", _consecutiveErrorCount, e); throw e; } else { _segmentLogger - .warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e); + .warn("Stream transient exception when fetching messages, retrying (count={})", + _consecutiveErrorCount, e); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); makeStreamConsumer("Too many transient errors"); } @@ -382,8 +397,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { throws Exception { _numRowsErrored = 0; final long idlePipeSleepTimeMillis = 100; - final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig - .getFetchTimeoutMillis()); // 3 minute count + final long maxIdleCountBeforeStatUpdate = + (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig + .getFetchTimeoutMillis()); // 3 minute count StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory .create(_currentOffset); // so that we always update the metric when we enter this method. long consecutiveIdleCount = 0; @@ -391,14 +407,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // anymore. Remove the file if it exists. removeSegmentFile(); - _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset); + _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, + _finalOffset); while (!_shouldStop && !endCriteriaReached()) { // Consume for the next readTime ms, or we get to final offset, whichever happens earlier, // Update _currentOffset upon return from this method MessageBatch messageBatch; try { messageBatch = _partitionGroupConsumer - .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + .fetchMessages(_currentOffset, null, + _partitionLevelStreamConfig.getFetchTimeoutMillis()); _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); _consecutiveErrorCount = 0; } catch (TimeoutException e) { @@ -408,7 +426,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { handleTransientStreamErrors(e); continue; } catch (PermanentConsumerException e) { - _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e); + _segmentLogger + .warn("Permanent exception from stream when fetching messages, stopping consumption", + e); throw e; } catch (Exception e) { // Unknown exception from stream. Treat as a transient exception. @@ -435,7 +455,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Create a new stream consumer wrapper, in case we are stuck on something. consecutiveIdleCount++; if (consecutiveIdleCount > maxIdleCountBeforeStatUpdate) { - _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); + _serverMetrics + .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); consecutiveIdleCount = 0; makeStreamConsumer("Idle for too long"); } @@ -443,8 +464,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } if (_numRowsErrored > 0) { - _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); - _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); + _serverMetrics + .addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); + _serverMetrics + .addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); } return true; } @@ -479,7 +502,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Throw an exception. // _segmentLogger - .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed, + .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, + _numRowsIndexed, _segmentMaxRowCount); throw new RuntimeException("Realtime segment full"); } @@ -491,7 +515,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index); GenericRow decodedRow = _messageDecoder - .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index), + .decode(messagesAndOffsets.getMessageAtIndex(index), + messagesAndOffsets.getMessageOffsetAtIndex(index), messagesAndOffsets.getMessageLengthAtIndex(index), reuse); if (decodedRow != null) { try { @@ -500,7 +525,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { decodedRow = _complexTypeTransformer.transform(decodedRow); } if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { - for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { + for (Object singleRow : (Collection) decodedRow + .getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { realtimeRowsConsumedMeter = _serverMetrics @@ -510,7 +536,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); } else { realtimeRowsDroppedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, + .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, + 1, realtimeRowsDroppedMeter); } } @@ -524,16 +551,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); } else { realtimeRowsDroppedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, + .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, + 1, realtimeRowsDroppedMeter); } } } catch (Exception e) { - String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow); + String errorMessage = String + .format("Caught exception while transforming the record: %s", decodedRow); _segmentLogger.error(errorMessage, e); _numRowsErrored++; _realtimeTableDataManager - .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + .addSegmentError(_segmentNameStr, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); } } else { realtimeRowsDroppedMeter = _serverMetrics @@ -548,7 +578,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } updateCurrentDocumentCountMetrics(); if (streamMessageCount != 0) { - _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, + _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", + indexedMessageCount, streamMessageCount, _currentOffset); } else { // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream @@ -557,6 +588,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } public class PartitionConsumer implements Runnable { + public void run() { long initialConsumptionEnd = 0L; long lastCatchUpStart = 0L; @@ -567,7 +599,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_state.shouldConsume()) { consumeLoop(); // Consume until we reached the end criteria, or we are stopped. } - _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0); + _serverMetrics + .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0); if (_shouldStop) { break; } @@ -580,7 +613,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } else if (_state == State.CATCHING_UP) { catchUpTimeMillis += now() - lastCatchUpStart; _serverMetrics - .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS, + .setValueOfTableGauge(_metricKeyName, + ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS, TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis)); } @@ -599,8 +633,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { case CATCH_UP: if (rspOffset.compareTo(_currentOffset) <= 0) { // Something wrong with the controller. Back off and try again. - _segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset, - _currentOffset); + _segmentLogger + .error("Invalid catchup offset {} in controller response, current offset {}", + rspOffset, + _currentOffset); hold(); } else { _state = State.CATCHING_UP; @@ -631,7 +667,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Could not build segment for some reason. We can only download it. _state = State.ERROR; _realtimeTableDataManager.addSegmentError(_segmentNameStr, - new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null)); + new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", + null)); } break; default: @@ -640,13 +677,21 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { break; case COMMIT: _state = State.COMMITTING; + StreamPartitionMsgOffset endOffset = _partitionGroupConsumer.commit(_currentOffset); + if (endOffset != null) { + _currentOffset = endOffset; + } + long buildTimeSeconds = response.getBuildTimeSeconds(); + buildSegmentForCommit(buildTimeSeconds * 1000L); + boolean commitSuccess = false; if (_segmentBuildDescriptor == null) { // We could not build the segment. Go into error state. _state = State.ERROR; _realtimeTableDataManager.addSegmentError(_segmentNameStr, - new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null)); + new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", + null)); } else { success = commitSegment(response.getControllerVipUrl(), response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()); @@ -660,9 +705,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { hold(); } } + if (_state != State.COMMITTED) { + _partitionGroupConsumer.rollback(); + } + break; default: - _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString()); + _segmentLogger + .error("Holding after response from Controller: {}", response.toJsonString()); hold(); break; } @@ -673,7 +723,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { postStopConsumedMsg(e.getClass().getName()); _state = State.ERROR; _realtimeTableDataManager - .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + .addSegmentError(_segmentNameStr, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0); return; } @@ -682,7 +733,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (initialConsumptionEnd != 0L) { _serverMetrics - .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS, + .setValueOfTableGauge(_metricKeyName, + ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS, TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd)); } // There is a race condition that the destroy() method can be called which ends up calling stop on the consumer. @@ -703,7 +755,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private CompletionMode getSegmentCompletionMode() { CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig(); if (completionConfig != null) { - if (CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) { + if (CompletionMode.DOWNLOAD.toString() + .equalsIgnoreCase(completionConfig.getCompletionMode())) { return CompletionMode.DOWNLOAD; } } @@ -724,7 +777,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // built the segment successfully. protected void buildSegmentForCommit(long buildTimeLeaseMs) { try { - if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) { + if (_segmentBuildDescriptor != null + && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) { // Double-check that we have the file, just in case. File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile(); if (segmentTarFile != null && segmentTarFile.exists()) { @@ -734,7 +788,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { removeSegmentFile(); if (buildTimeLeaseMs <= 0) { if (_segBuildSemaphore == null) { - buildTimeLeaseMs = SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L; + buildTimeLeaseMs = + SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L; } else { // We know we are going to use a semaphore to limit number of segment builds, and could be // blocked for a long time. The controller has not provided a lease time, so set one to @@ -806,7 +861,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now()); // lets convert the segment now RealtimeSegmentConverter converter = - new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema, + new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), + _schema, _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn, _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _nullHandlingEnabled); @@ -821,7 +877,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { final long buildTimeMillis = now() - lockAcquireTimeMillis; final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis; _segmentLogger - .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis, waitTimeMillis); + .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis, + waitTimeMillis); File dataDir = new File(_resourceDataDir); File indexDir = new File(dataDir, _segmentNameStr); @@ -834,55 +891,67 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { FileUtils.moveDirectory(tempIndexDir, indexDir); } catch (IOException e) { String errorMessage = - String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir); + String.format("Caught exception while moving index directory from: %s to: %s", + tempIndexDir, indexDir); _segmentLogger.error(errorMessage, e); _realtimeTableDataManager - .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + .addSegmentError(_segmentNameStr, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); return null; } finally { FileUtils.deleteQuietly(tempSegmentFolder); } long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir); - _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS, + _serverMetrics.setValueOfTableGauge(_metricKeyName, + ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS, TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis)); - _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS, + _serverMetrics.setValueOfTableGauge(_metricKeyName, + ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis)); if (forCommit) { - File segmentTarFile = new File(dataDir, _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File segmentTarFile = new File(dataDir, + _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); try { TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile); } catch (IOException e) { String errorMessage = - String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile); + String + .format("Caught exception while taring index directory from: %s to: %s", indexDir, + segmentTarFile); _segmentLogger.error(errorMessage, e); _realtimeTableDataManager - .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + .addSegmentError(_segmentNameStr, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); return null; } File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir); if (metadataFile == null) { _segmentLogger - .error("Failed to find file: {} under index directory: {}", V1Constants.MetadataKeys.METADATA_FILE_NAME, + .error("Failed to find file: {} under index directory: {}", + V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir); return null; } File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir); if (creationMetaFile == null) { _segmentLogger - .error("Failed to find file: {} under index directory: {}", V1Constants.SEGMENT_CREATION_META, indexDir); + .error("Failed to find file: {} under index directory: {}", + V1Constants.SEGMENT_CREATION_META, indexDir); return null; } Map<String, File> metadataFiles = new HashMap<>(); metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFile); metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile); - return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset, buildTimeMillis, + return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset, + buildTimeMillis, waitTimeMillis, segmentSizeBytes); } else { - return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis, waitTimeMillis, + return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis, + waitTimeMillis, segmentSizeBytes); } } catch (InterruptedException e) { @@ -904,7 +973,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit); - if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { + if (!commitResponse.getStatus() + .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { return false; } @@ -913,7 +983,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { return true; } - protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) { + protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, + boolean isSplitCommit) { SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString()) @@ -927,7 +998,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { SegmentCommitter segmentCommitter; try { - segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl); + segmentCommitter = _segmentCommitterFactory + .createSegmentCommitter(isSplitCommit, params, controllerVipUrl); } catch (URISyntaxException e) { _segmentLogger.error("Failed to create a segment committer: ", e); return SegmentCompletionProtocol.RESP_NOT_SENT; @@ -971,12 +1043,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { /** * Cleans up the metrics that reflects the state of the realtime segment. - * This step is essential as the instance may not be the target location for some of the partitions. - * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions + * This step is essential as the instance may not be the target location for some of the + * partitions. + * E.g. if the number of partitions increases, or a host swap is needed, the target location for + * some partitions * may change, - * and the current host remains to run. In this case, the current server would still keep the state of the old + * and the current host remains to run. In this case, the current server would still keep the + * state of the old * partitions, - * which no longer resides in this host any more, thus causes false positive information to the metric system. + * which no longer resides in this host any more, thus causes false positive information to the + * metric system. */ private void cleanupMetrics() { _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING); @@ -994,10 +1070,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { protected void postStopConsumedMsg(String reason) { do { SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); - params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr) + params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason) + .withSegmentName(_segmentNameStr) .withInstanceId(_instanceId); - SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params); + SegmentCompletionProtocol.Response response = _protocolHandler + .segmentStoppedConsuming(params); if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) { _segmentLogger.info("Got response {}", response.toJsonString()); break; @@ -1036,7 +1114,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { final StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); _segmentLogger - .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(), + .info( + "State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", + _state.toString(), _startOffset, endOffset); stop(); _segmentLogger.info("Consumer thread stopped in state {}", _state.toString()); @@ -1058,8 +1138,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { CompletionMode segmentCompletionMode = getSegmentCompletionMode(); switch (segmentCompletionMode) { case DOWNLOAD: - _segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", _state.toString(), - segmentCompletionMode); + _segmentLogger + .info("State {}. CompletionMode {}. Downloading to replace", _state.toString(), + segmentCompletionMode); downloadSegmentAndReplace(segmentZKMetadata); break; case DEFAULT: @@ -1067,23 +1148,28 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_currentOffset.compareTo(endOffset) > 0) { // We moved ahead of the offset that is committed in ZK. _segmentLogger - .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset, + .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", + _currentOffset, endOffset); downloadSegmentAndReplace(segmentZKMetadata); } else if (_currentOffset.compareTo(endOffset) == 0) { _segmentLogger - .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset); + .info("Current offset {} matches offset in zk {}. Replacing segment", + _currentOffset, endOffset); buildSegmentAndReplace(); } else { - _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset); + _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, + endOffset); boolean success = catchupToFinalOffset(endOffset, - TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS)); + TimeUnit.MILLISECONDS + .convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS)); if (success) { _segmentLogger.info("Caught up to offset {}", _currentOffset); buildSegmentAndReplace(); } else { _segmentLogger - .info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset); + .info("Could not catch up to offset (current = {}). Downloading to replace", + _currentOffset); downloadSegmentAndReplace(segmentZKMetadata); } } @@ -1093,7 +1179,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } break; default: - _segmentLogger.info("Downloading to replace segment while in state {}", _state.toString()); + _segmentLogger + .info("Downloading to replace segment while in state {}", _state.toString()); downloadSegmentAndReplace(segmentZKMetadata); break; } @@ -1107,7 +1194,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) { closeStreamConsumers(); _realtimeTableDataManager - .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig, _tableConfig); + .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig, + _tableConfig); } protected long now() { @@ -1130,7 +1218,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } if (_currentOffset.compareTo(endOffset) != 0) { // Timeout? - _segmentLogger.error("Could not consume up to {} (current offset {})", endOffset, _currentOffset); + _segmentLogger + .error("Could not consume up to {} (current offset {})", endOffset, _currentOffset); return false; } @@ -1175,9 +1264,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Assume that this is called only on OFFLINE to CONSUMING transition. // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here. public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, - RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, + RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, + IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, - ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) { + ServerMetrics serverMetrics, + @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) { _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore(); _segmentZKMetadata = segmentZKMetadata; _tableConfig = tableConfig; @@ -1190,16 +1281,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType); - _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType); + _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, + _tableNameWithType); String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); _partitionLevelStreamConfig = - new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + new PartitionLevelStreamConfig(_tableNameWithType, + IngestionConfigUtils.getStreamConfigMap(_tableConfig)); _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig); _streamPartitionMsgOffsetFactory = - StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig) + .createStreamMsgOffsetFactory(); _streamTopic = _partitionLevelStreamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; @@ -1213,26 +1307,31 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId; - _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); + _segmentLogger = LoggerFactory + .getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + _streamTopic; _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr, - indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), + indexLoadingConfig.isRealtimeOffHeapAllocation(), + indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), serverMetrics); List<String> sortedColumns = indexLoadingConfig.getSortedColumns(); if (sortedColumns.isEmpty()) { - _segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}", + _segmentLogger.info( + "RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}", _llcSegmentName); _sortedColumn = null; } else { String firstSortedColumn = sortedColumns.get(0); if (_schema.hasColumn(firstSortedColumn)) { - _segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}", + _segmentLogger.info( + "Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}", firstSortedColumn, _llcSegmentName); _sortedColumn = firstSortedColumn; } else { _segmentLogger - .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", + .warn( + "Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", firstSortedColumn, _llcSegmentName); _sortedColumn = null; } @@ -1251,7 +1350,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // No dictionary Columns _noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()); - _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns()); + _varLengthDictionaryColumns = new ArrayList<>( + indexLoadingConfig.getVarLengthDictionaryColumns()); // Read the max number of rows int segmentMaxRowCount = _partitionLevelStreamConfig.getFlushThresholdRows(); @@ -1274,26 +1374,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Start new realtime segment String consumerDir = realtimeTableDataManager.getConsumerDir(); RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = - new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr) + new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType) + .setSegmentName(_segmentNameStr) .setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName) - .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()) + .setCapacity(_segmentMaxRowCount) + .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()) .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns()) .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns()) .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns) - .setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns()) - .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata) + .setFSTIndexColumns(fstIndexColumns) + .setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns()) + .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()) + .setSegmentZKMetadata(segmentZKMetadata) .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager) .setStatsHistory(realtimeTableDataManager.getStatsHistory()) - .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled) + .setAggregateMetrics(indexingConfig.isAggregateMetrics()) + .setNullHandlingEnabled(_nullHandlingEnabled) .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode()) .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) .setHashFunction(tableConfig.getHashFunction()) .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn()); // Create message decoder - Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); + Set<String> fieldsToRead = IngestionUtils + .getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); - _clientId = _streamTopic + "-" + _partitionGroupId; + _clientId = _instanceId + "-" + _streamTopic + "-" + _partitionGroupId; // Create record transformer _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); @@ -1315,9 +1421,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig(); if (segmentPartitionConfig != null) { - Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig + .getColumnPartitionMap(); if (columnPartitionMap.size() == 1) { - Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next(); + Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator() + .next(); String partitionColumn = entry.getKey(); ColumnPartitionConfig columnPartitionConfig = entry.getValue(); String partitionFunctionName = columnPartitionConfig.getFunctionName(); @@ -1342,7 +1450,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, " + "using number of stream " + "partitions", numPartitionGroups, numPartitions); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1); + _serverMetrics + .addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, + 1); numPartitions = numPartitionGroups; } } catch (Exception e) { @@ -1354,10 +1464,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn); realtimeSegmentConfigBuilder - .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions)); + .setPartitionFunction(PartitionFunctionFactory + .getPartitionFunction(partitionFunctionName, numPartitions)); realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId); } else { - _segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet()); + _segmentLogger + .warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet()); } } @@ -1374,10 +1486,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try (StreamMetadataProvider metadataProvider = _streamConsumerFactory .createPartitionMetadataProvider(_clientId, _partitionGroupId)) { _latestStreamOffsetAtStartupTime = metadataProvider - .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000); + .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/ + 5000); } catch (Exception e) { - _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId, - _partitionGroupId); + _segmentLogger + .warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", + _clientId, + _partitionGroupId); } long now = now(); @@ -1394,16 +1509,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // cases, we let some minimum consumption happen before we attempt to complete the segment (unless, of course // the max consumption time has been configured to be less than the minimum time we use in this class). long minConsumeTimeMillis = - Math.min(maxConsumeTimeMillis, TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES)); + Math.min(maxConsumeTimeMillis, + TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES)); if (_consumeEndTime - now < minConsumeTimeMillis) { _consumeEndTime = now + minConsumeTimeMillis; } _segmentCommitterFactory = - new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); + new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, + indexLoadingConfig, serverMetrics); _segmentLogger - .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName, + .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", + _llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString()); start(); } @@ -1417,7 +1535,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } _segmentLogger.info("Creating new stream consumer, reason: {}", reason); _partitionGroupConsumer = - _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); + _streamConsumerFactory + .createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); } /** @@ -1441,7 +1560,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas, // Number of rows consumed should be used for consumption metric. long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get(); - _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed); + _serverMetrics + .addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed); _lastUpdatedRowsIndexed.set(_numRowsIndexed); final long now = now(); final int rowsConsumed = _numRowsConsumed - _lastConsumedCount; @@ -1451,7 +1571,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { || rowsConsumed >= MSG_COUNT_THRESHOLD_FOR_LOG) { _segmentLogger.info( "Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}", - rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset, _numRowsConsumed, + rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset, + _numRowsConsumed, _numRowsIndexed); _lastConsumedCount = _numRowsConsumed; _lastLogTime = now; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index 735c91d..9748945 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -21,12 +21,15 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; import java.util.concurrent.TimeoutException; - /** * Consumer interface for consuming from a partition group of a stream */ public interface PartitionGroupConsumer extends Closeable { + default void start(StreamPartitionMsgOffset startOffset) { + + } + /** * Fetch messages and offsets from the stream partition group * @@ -37,6 +40,16 @@ public interface PartitionGroupConsumer extends Closeable { * milliseconds * @return An iterable containing messages fetched from the stream partition and their offsets */ - MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, int timeoutMs) + MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, + StreamPartitionMsgOffset endOffset, int timeoutMs) throws TimeoutException; + + default StreamPartitionMsgOffset commit( + final StreamPartitionMsgOffset currentOffset) { + return null; + } + + default void rollback() { + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org