This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new caac81d Cleaning up getTableName() for segment zk metadata (#4288) caac81d is described below commit caac81dfca7bb2de927e2c6306b554ac5cf5d6e6 Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Mon Jun 10 13:00:59 2019 -0700 Cleaning up getTableName() for segment zk metadata (#4288) * Cleaning up getTableName() for segment zk metadata Removing unnecessary calls to segmentZKMetadata.getTableName() * Updated variable names for HL,LLRealtimeSegmentDataManager --- .../helix/core/PinotHelixResourceManager.java | 47 ++++++++++------------ .../helix/core/retention/RetentionManager.java | 4 +- .../core/retention/strategy/RetentionStrategy.java | 3 +- .../retention/strategy/TimeRetentionStrategy.java | 6 +-- .../strategy/TimeRetentionStrategyTest.java | 15 +++---- .../realtime/HLRealtimeSegmentDataManager.java | 19 +++++---- .../realtime/LLRealtimeSegmentDataManager.java | 13 +++--- .../starter/helix/SegmentFetcherAndLoader.java | 13 +++--- 8 files changed, 57 insertions(+), 63 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 877458d..cb57eb4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1632,7 +1632,7 @@ public class PinotHelixResourceManager { "Failed to update ZK metadata for segment: " + segmentName + " of table: " + offlineTableName); } LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, offlineTableName); - final String rawTableName = offlineSegmentZKMetadata.getTableName(); + final String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName); TableConfig tableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, rawTableName); Preconditions.checkNotNull(tableConfig); @@ -1640,14 +1640,14 @@ public class PinotHelixResourceManager { // Send a message to the servers to update the segment. // We return success even if we are not able to send messages (which can happen if no servers are alive). // For segment validation errors we would have returned earlier. - sendSegmentRefreshMessage(offlineSegmentZKMetadata); + sendSegmentRefreshMessage(offlineTableName, offlineSegmentZKMetadata); // Send a message to the brokers to update the table's time boundary info if the segment push type is APPEND. if (shouldSendTimeboundaryRefreshMsg(rawTableName, tableConfig)) { - sendTimeboundaryRefreshMessageToBrokers(offlineSegmentZKMetadata); + sendTimeboundaryRefreshMessageToBrokers(offlineTableName, offlineSegmentZKMetadata); } } else { // Go through the ONLINE->OFFLINE->ONLINE state transition to update the segment - if (!updateExistedSegment(offlineSegmentZKMetadata)) { + if (!updateExistedSegment(offlineTableName, offlineSegmentZKMetadata)) { LOGGER.error("Failed to refresh segment: {} of table: {} by the ONLINE->OFFLINE->ONLINE state transition", segmentName, offlineTableName); } @@ -1655,13 +1655,13 @@ public class PinotHelixResourceManager { } // Send a message to the pinot brokers to notify them to update its Timeboundary Info. - private void sendTimeboundaryRefreshMessageToBrokers(OfflineSegmentZKMetadata segmentZKMetadata) { + private void sendTimeboundaryRefreshMessageToBrokers(String tableNameWithType, + OfflineSegmentZKMetadata segmentZKMetadata) { final String segmentName = segmentZKMetadata.getSegmentName(); - final String rawTableName = segmentZKMetadata.getTableName(); - final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); final int timeoutMs = -1; // Infinite timeout on the recipient. - TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(offlineTableName, segmentName); + TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(tableNameWithType, segmentName); Criteria recipientCriteria = new Criteria(); // Currently Helix does not support send message to a Spectator. So we walk around the problem by sending the @@ -1672,7 +1672,7 @@ public class PinotHelixResourceManager { recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW); // The brokerResource field in the EXTERNALVIEW stores the offline table name in the Partition subfield. - recipientCriteria.setPartition(offlineTableName); + recipientCriteria.setPartition(tableNameWithType); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName, @@ -1686,7 +1686,7 @@ public class PinotHelixResourceManager { // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get // the latest time boundary info. LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName, - offlineTableName, nMsgsSent); + tableNameWithType, nMsgsSent); } } @@ -1770,12 +1770,13 @@ public class PinotHelixResourceManager { * The message is sent as session-specific, so if a new zk session is created (e.g. server restarts) * it will not get the message. * + * @param tableNameWithType Table name with type * @param segmentZKMetadata is the metadata of the newly arrived segment. */ // NOTE: method should be thread-safe - private void sendSegmentRefreshMessage(OfflineSegmentZKMetadata segmentZKMetadata) { + private void sendSegmentRefreshMessage(String tableNameWithType, OfflineSegmentZKMetadata segmentZKMetadata) { final String segmentName = segmentZKMetadata.getSegmentName(); - final String rawTableName = segmentZKMetadata.getTableName(); + final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); final int timeoutMs = -1; // Infinite timeout on the recipient. @@ -1842,27 +1843,21 @@ public class PinotHelixResourceManager { HelixHelper.addSegmentToIdealState(_helixZkManager, offlineTableName, segmentName, assignedInstances); } - private boolean updateExistedSegment(SegmentZKMetadata segmentZKMetadata) { - final String tableName; - if (segmentZKMetadata instanceof RealtimeSegmentZKMetadata) { - tableName = TableNameBuilder.REALTIME.tableNameWithType(segmentZKMetadata.getTableName()); - } else { - tableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentZKMetadata.getTableName()); - } + private boolean updateExistedSegment(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) { final String segmentName = segmentZKMetadata.getSegmentName(); HelixDataAccessor helixDataAccessor = _helixZkManager.getHelixDataAccessor(); - PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableName); + PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableNameWithType); // Set all partitions to offline to unload them from the servers boolean updateSuccessful; do { - final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); + final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); final Set<String> instanceSet = idealState.getInstanceSet(segmentName); if (instanceSet == null || instanceSet.size() == 0) { // We are trying to refresh a segment, but there are no instances currently assigned for fielding this segment. // When those instances do come up, the segment will be uploaded correctly, so return success but log a warning. - LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableName); + LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableNameWithType); return true; } for (final String instance : instanceSet) { @@ -1872,7 +1867,7 @@ public class PinotHelixResourceManager { } while (!updateSuccessful); // Check that the ideal state has been written to ZK - IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); + IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName); for (String state : instanceStateMap.values()) { if (!"OFFLINE".equals(state)) { @@ -1883,7 +1878,7 @@ public class PinotHelixResourceManager { // Wait until the partitions are offline in the external view LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView"); - if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE", + if (!ifExternalViewChangeReflectedForState(tableNameWithType, segmentName, "OFFLINE", _externalViewOnlineToOfflineTimeoutMillis, false)) { LOGGER .error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit", @@ -1893,7 +1888,7 @@ public class PinotHelixResourceManager { // Set all partitions to online so that they load the new segment data do { - final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); + final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); final Set<String> instanceSet = idealState.getInstanceSet(segmentName); LOGGER.info("Found {} instances for segment '{}', in ideal state", instanceSet.size(), segmentName); for (final String instance : instanceSet) { @@ -1904,7 +1899,7 @@ public class PinotHelixResourceManager { } while (!updateSuccessful); // Check that the ideal state has been written to ZK - updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); + updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName); LOGGER .info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index b5595e7..7d5182d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -113,7 +113,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { List<String> segmentsToDelete = new ArrayList<>(); for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _pinotHelixResourceManager .getOfflineSegmentMetadata(offlineTableName)) { - if (retentionStrategy.isPurgeable(offlineSegmentZKMetadata)) { + if (retentionStrategy.isPurgeable(offlineTableName, offlineSegmentZKMetadata)) { segmentsToDelete.add(offlineSegmentZKMetadata.getSegmentName()); } } @@ -141,7 +141,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } } else { // Sealed segment - if (retentionStrategy.isPurgeable(realtimeSegmentZKMetadata)) { + if (retentionStrategy.isPurgeable(realtimeTableName, realtimeSegmentZKMetadata)) { segmentsToDelete.add(segmentName); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java index 5917c9a..e8f6336 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java @@ -29,8 +29,9 @@ public interface RetentionStrategy { /** * Returns whether the segment should be purged * + * @param tableNameWithType Table name with type * @param segmentZKMetadata Segment ZK metadata * @return Whether the segment should be purged */ - boolean isPurgeable(SegmentZKMetadata segmentZKMetadata); + boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java index 84373c5..167e030 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java @@ -38,11 +38,11 @@ public class TimeRetentionStrategy implements RetentionStrategy { } @Override - public boolean isPurgeable(SegmentZKMetadata segmentZKMetadata) { + public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) { TimeUnit timeUnit = segmentZKMetadata.getTimeUnit(); if (timeUnit == null) { LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(), - segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName()); + segmentZKMetadata.getSegmentName(), tableNameWithType); return false; } long endTime = segmentZKMetadata.getEndTime(); @@ -51,7 +51,7 @@ public class TimeRetentionStrategy implements RetentionStrategy { // Check that the end time is between 1971 and 2071 if (!TimeUtils.timeValueInValidRange(endTimeMs)) { LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(), - segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName(), endTime, timeUnit); + segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit); return false; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java index b2533aa..009b397 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java @@ -34,34 +34,35 @@ public class TimeRetentionStrategyTest { @Test public void testTimeRetention() { + String tableNameWithType = "myTable_OFFLINE"; TimeRetentionStrategy retentionStrategy = new TimeRetentionStrategy(TimeUnit.DAYS, 30L); SegmentZKMetadata metadata = new OfflineSegmentZKMetadata(); // Without setting time unit or end time, should not throw exception - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); metadata.setTimeUnit(TimeUnit.DAYS); - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); // Set end time to Jan 2nd, 1970 (not purgeable due to bogus timestamp) metadata.setEndTime(1L); - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); // Set end time to today long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); metadata.setEndTime(today); - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); // Set end time to two weeks ago metadata.setEndTime(today - 14); - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); // Set end time to two months ago (purgeable due to being past the retention period) metadata.setEndTime(today - 60); - assertTrue(retentionStrategy.isPurgeable(metadata)); + assertTrue(retentionStrategy.isPurgeable(tableNameWithType, metadata)); // Set end time to 200 years in the future (not purgeable due to bogus timestamp) metadata.setEndTime(today + (365 * 200)); - assertFalse(retentionStrategy.isPurgeable(metadata)); + assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata)); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java index 45e0128..9b5c542 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java @@ -51,7 +51,6 @@ import org.apache.pinot.core.realtime.stream.StreamConfig; import org.apache.pinot.core.realtime.stream.StreamConsumerFactory; import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider; import org.apache.pinot.core.realtime.stream.StreamLevelConsumer; -import org.apache.pinot.core.realtime.stream.StreamMessageMetadata; import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -63,7 +62,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class); private final static long ONE_MINUTE_IN_MILLSEC = 1000 * 60; - private final String tableName; + private final String tableNameWithType; private final String segmentName; private final Schema schema; private final String timeColumnName; @@ -109,7 +108,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _recordTransformer = CompoundTransformer.getDefaultTransformer(schema); this.serverMetrics = serverMetrics; this.segmentName = realtimeSegmentZKMetadata.getSegmentName(); - this.tableName = tableConfig.getTableName(); + this.tableNameWithType = tableConfig.getTableName(); this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); List<String> sortedColumns = indexLoadingConfig.getSortedColumns(); @@ -164,10 +163,10 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName(); _streamLevelConsumer = - _streamConsumerFactory.createStreamLevelConsumer(clientId, tableName, schema, instanceMetadata, serverMetrics); + _streamConsumerFactory.createStreamLevelConsumer(clientId, tableNameWithType, schema, instanceMetadata, serverMetrics); _streamLevelConsumer.start(); - tableStreamName = tableName + "_" + _streamConfig.getTopicName(); + tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName(); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); if (indexingConfig != null && indexingConfig.isAggregateMetrics()) { @@ -258,7 +257,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // lets convert the segment now RealtimeSegmentConverter converter = new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema, - realtimeSegmentZKMetadata.getTableName(), timeColumnName, realtimeSegmentZKMetadata.getSegmentName(), + tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(), sortedColumn, HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns, null/*StarTreeIndexSpec*/); // Star tree not supported for HLC. @@ -339,7 +338,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { segmentLogger .error("FATAL: Exception committing or shutting down consumer commitSuccessful={}", commitSuccessful, e); - serverMetrics.addMeteredTableValue(tableName, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L); + serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L); if (!commitSuccessful) { _streamLevelConsumer.shutdown(); } @@ -348,7 +347,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try { segmentLogger.info("Marking current segment as completed in Helix"); RealtimeSegmentZKMetadata metadataToOverwrite = new RealtimeSegmentZKMetadata(); - metadataToOverwrite.setTableName(realtimeSegmentZKMetadata.getTableName()); + metadataToOverwrite.setTableName(tableNameWithType); metadataToOverwrite.setSegmentName(realtimeSegmentZKMetadata.getSegmentName()); metadataToOverwrite.setSegmentType(SegmentType.OFFLINE); metadataToOverwrite.setStatus(Status.DONE); @@ -377,7 +376,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { }); indexingThread.start(); - serverMetrics.addValueToTableGauge(tableName, ServerGauge.SEGMENT_COUNT, 1L); + serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); segmentLogger.debug("scheduling keepIndexing timer check"); // start a schedule timer to keep track of the segment TimerService.timer.schedule(segmentStatusTask, ONE_MINUTE_IN_MILLSEC, ONE_MINUTE_IN_MILLSEC); @@ -416,7 +415,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private void updateCurrentDocumentCountMetrics() { int currentRawDocs = realtimeSegment.getNumDocsIndexed(); serverMetrics - .addValueToTableGauge(tableName, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get())); + .addValueToTableGauge(tableNameWithType, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get())); lastUpdatedRawDocuments.set(currentRawDocs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 7fcbd9c..59d2bb8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -68,7 +68,6 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory; import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider; import org.apache.pinot.core.realtime.stream.StreamDecoderProvider; import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; -import org.apache.pinot.core.realtime.stream.StreamMessageMetadata; import org.apache.pinot.core.realtime.stream.StreamMetadataProvider; import org.apache.pinot.core.realtime.stream.TransientConsumerException; import org.apache.pinot.core.segment.creator.impl.V1Constants; @@ -234,7 +233,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private PartitionLevelConsumer _partitionLevelConsumer = null; private StreamMetadataProvider _streamMetadataProvider = null; private final File _resourceTmpDir; - private final String _tableName; + private final String _tableNameWithType; private final String _timeColumnName; private final List<String> _invertedIndexColumns; private final List<String> _noDictionaryColumns; @@ -661,7 +660,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // lets convert the segment now RealtimeSegmentConverter converter = new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema, - _segmentZKMetadata.getTableName(), _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn, + _tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn, _invertedIndexColumns, _noDictionaryColumns, _starTreeIndexSpec); segmentLogger.info("Trying to build segment"); try { @@ -1047,11 +1046,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentNameStr = _segmentZKMetadata.getSegmentName(); _segmentName = new LLCSegmentName(_segmentNameStr); _streamPartitionId = _segmentName.getPartitionId(); - _tableName = _tableConfig.getTableName(); + _tableNameWithType = _tableConfig.getTableName(); _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); - _metricKeyName = _tableName + "-" + _streamTopic + "-" + _streamPartitionId; + _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId; segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); - _tableStreamName = _tableName + "_" + _streamTopic; + _tableStreamName = _tableNameWithType + "_" + _streamTopic; _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr, indexLoadingConfig.isRealtimeOffheapAllocation(), indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics); @@ -1193,7 +1192,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas, // Number of rows consumed should be used for consumption metric. long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get(); - _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.DOCUMENT_COUNT, rowsIndexed); + _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed); _lastUpdatedRowsIndexed.set(_numRowsIndexed); final long now = now(); final int rowsConsumed = _numRowsConsumed - _lastConsumedCount; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java index 5126525..d92fce2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java @@ -111,7 +111,7 @@ public class SegmentFetcherAndLoader { localSegmentMetadata = null; } try { - if (!isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) { + if (!isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) { LOGGER.info("Segment metadata same as before, loading {} of table {} (crc {}) from disk", segmentName, tableNameWithType, localSegmentMetadata.getCrc()); _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, indexDir); @@ -143,7 +143,7 @@ public class SegmentFetcherAndLoader { // If we get here, then either it is the case that we have the segment loaded in memory (and therefore present // in disk) or, we need to load from the server. In the former case, we still need to check if the metadata // that we have is different from that in zookeeper. - if (isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) { + if (isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) { if (localSegmentMetadata == null) { LOGGER.info("Loading new segment {} of table {} from controller", segmentName, tableNameWithType); } else { @@ -172,20 +172,19 @@ public class SegmentFetcherAndLoader { } } - private boolean isNewSegmentMetadata(@Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata, - @Nullable SegmentMetadata existedSegmentMetadata) { - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newSegmentZKMetadata.getTableName()); + private boolean isNewSegmentMetadata(@Nonnull String tableNameWithType, + @Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata, @Nullable SegmentMetadata existedSegmentMetadata) { String segmentName = newSegmentZKMetadata.getSegmentName(); if (existedSegmentMetadata == null) { - LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, offlineTableName); + LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, tableNameWithType); return true; } long newCrc = newSegmentZKMetadata.getCrc(); long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc()); LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {} in table: {}", newCrc, existedCrc, - segmentName, offlineTableName); + segmentName, tableNameWithType); return newCrc != existedCrc; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org