KKcorps commented on code in PR #10928: URL: https://github.com/apache/pinot/pull/10928#discussion_r1233322255
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1360,9 +1361,9 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo firstSortedColumn, llcSegmentName); sortedColumn = firstSortedColumn; } else { - _segmentLogger - .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", - firstSortedColumn, llcSegmentName); + _segmentLogger.warn( Review Comment: nit: Unnecessary formatting ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1386,27 +1387,26 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled(); - _columnIndicesForRealtimeTable = new ColumnIndicesForRealtimeTable(sortedColumn, - new ArrayList<>(indexLoadingConfig.getInvertedIndexColumns()), - new ArrayList<>(indexLoadingConfig.getTextIndexColumns()), - new ArrayList<>(indexLoadingConfig.getFSTIndexColumns()), - new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()), - new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns())); + _columnIndicesForRealtimeTable = + new ColumnIndicesForRealtimeTable(sortedColumn, new ArrayList<>(indexLoadingConfig.getInvertedIndexColumns()), + new ArrayList<>(indexLoadingConfig.getTextIndexColumns()), + new ArrayList<>(indexLoadingConfig.getFSTIndexColumns()), + new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()), + new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns())); // Start new realtime segment String consumerDir = realtimeTableDataManager.getConsumerDir(); RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType) - .setSegmentName(_segmentNameStr) - .setStreamName(streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName) - .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()) - .setSegmentZKMetadata(segmentZKMetadata) - .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager) + .setSegmentName(_segmentNameStr).setStreamName(streamTopic).setSchema(_schema) + .setTimeColumnName(timeColumnName).setCapacity(_segmentMaxRowCount) + .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()) + .setSegmentZKMetadata(segmentZKMetadata).setOffHeap(_isOffHeap).setMemoryManager(_memoryManager) .setStatsHistory(realtimeTableDataManager.getStatsHistory()) .setAggregateMetrics(indexingConfig.isAggregateMetrics()) .setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig)) - .setNullHandlingEnabled(_nullHandlingEnabled) - .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode()) + .setNullHandlingEnabled(_nullHandlingEnabled).setConsumerDir(consumerDir) Review Comment: nit: Unnecessary formatting ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1451,17 +1451,16 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo setConsumeEndTime(segmentZKMetadata, _consumeStartTime); _segmentCommitterFactory = new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); - _segmentLogger - .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName, - _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); + _segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", + llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); startConsumerThread(); } catch (Exception e) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. _partitionGroupConsumerSemaphore.release(); - _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), - "Failed to initialize segment data manager", e)); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, Review Comment: nit: Unnecessary formatting ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1386,27 +1387,26 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled(); - _columnIndicesForRealtimeTable = new ColumnIndicesForRealtimeTable(sortedColumn, - new ArrayList<>(indexLoadingConfig.getInvertedIndexColumns()), - new ArrayList<>(indexLoadingConfig.getTextIndexColumns()), - new ArrayList<>(indexLoadingConfig.getFSTIndexColumns()), - new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()), - new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns())); + _columnIndicesForRealtimeTable = + new ColumnIndicesForRealtimeTable(sortedColumn, new ArrayList<>(indexLoadingConfig.getInvertedIndexColumns()), Review Comment: nit: Unnecessary formatting ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1309,15 +1310,15 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo _indexLoadingConfig = indexLoadingConfig; _schema = schema; _serverMetrics = serverMetrics; + _partitionUpsertMetadataManager = partitionUpsertMetadataManager; _isReadyToConsumeData = isReadyToConsumeData; _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType); _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType); CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig(); - _segmentCompletionMode = completionConfig != null - && CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode()) - ? CompletionMode.DOWNLOAD : CompletionMode.DEFAULT; + _segmentCompletionMode = completionConfig != null && CompletionMode.DOWNLOAD.toString() Review Comment: nit: Unnecessary formatting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org