This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8e10320595 handle absent segments so that catchup checker doesn't get stuck on them (#12883) 8e10320595 is described below commit 8e103205955e8af4fe286ebd6e97b30605724be2 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Mon Apr 22 16:41:46 2024 -0700 handle absent segments so that catchup checker doesn't get stuck on them (#12883) * skip missing segments while checking freshness during server startup * get new consuming segments again if current consuming segments are committed by other servers --- .../server/starter/helix/BaseServerStarter.java | 71 +++++++----- .../FreshnessBasedConsumptionStatusChecker.java | 7 +- .../IngestionBasedConsumptionStatusChecker.java | 128 ++++++++++++++------- .../helix/OffsetBasedConsumptionStatusChecker.java | 7 +- .../helix/ConsumptionStatusCheckerTestUtils.java | 38 ++++++ ...FreshnessBasedConsumptionStatusCheckerTest.java | 103 ++++++++++++++--- .../OffsetBasedConsumptionStatusCheckerTest.java | 32 ++++-- 7 files changed, 288 insertions(+), 98 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 02c7b81ea5..78cd1a14e7 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -153,8 +154,8 @@ public abstract class BaseServerStarter implements ServiceStartable { _helixClusterName = _serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); ServiceStartableUtils.applyClusterConfig(_serverConf, _zkAddress, _helixClusterName, ServiceRole.SERVER); - PinotInsecureMode.setPinotInInsecureMode( - Boolean.valueOf(_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, + PinotInsecureMode.setPinotInInsecureMode(Boolean.parseBoolean( + _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); setupHelixSystemProperties(); @@ -275,8 +276,7 @@ public abstract class BaseServerStarter implements ServiceStartable { // collect all resources which have this instance in the ideal state List<String> resourcesToMonitor = new ArrayList<>(); - - Set<String> consumingSegments = new HashSet<>(); + Map<String, Set<String>> consumingSegments = new HashMap<>(); boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0; if (isFreshnessStatusCheckerEnabled && realtimeMinFreshnessMs <= 0) { LOGGER.warn("Realtime min freshness {} must be > 0. Setting relatime min freshness to default {}.", @@ -289,23 +289,22 @@ public abstract class BaseServerStarter implements ServiceStartable { if (!TableNameBuilder.isTableResource(resourceName)) { continue; } - // Only monitor enabled resources IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, resourceName); - if (idealState.isEnabled()) { - - for (String partitionName : idealState.getPartitionSet()) { - if (idealState.getInstanceSet(partitionName).contains(_instanceId)) { - resourcesToMonitor.add(resourceName); - break; - } + if (idealState == null || !idealState.isEnabled()) { + continue; + } + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partitionName).contains(_instanceId)) { + resourcesToMonitor.add(resourceName); + break; } - if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) { - for (String partitionName : idealState.getPartitionSet()) { - if (StateModel.SegmentStateModel.CONSUMING.equals( - idealState.getInstanceStateMap(partitionName).get(_instanceId))) { - consumingSegments.add(partitionName); - } + } + if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) { + for (String partitionName : idealState.getPartitionSet()) { + if (StateModel.SegmentStateModel.CONSUMING.equals( + idealState.getInstanceStateMap(partitionName).get(_instanceId))) { + consumingSegments.computeIfAbsent(resourceName, k -> new HashSet<>()).add(partitionName); } } } @@ -332,7 +331,7 @@ public abstract class BaseServerStarter implements ServiceStartable { realtimeMinFreshnessMs, idleTimeoutMs); FreshnessBasedConsumptionStatusChecker freshnessStatusChecker = new FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments, - realtimeMinFreshnessMs, idleTimeoutMs); + this::getConsumingSegments, realtimeMinFreshnessMs, idleTimeoutMs); Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness = freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria; serviceStatusCallbackListBuilder.add( @@ -341,7 +340,8 @@ public abstract class BaseServerStarter implements ServiceStartable { } else if (isOffsetBasedConsumptionStatusCheckerEnabled) { LOGGER.info("Setting up offset based status checker"); OffsetBasedConsumptionStatusChecker consumptionStatusChecker = - new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments); + new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments, + this::getConsumingSegments); Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria; serviceStatusCallbackListBuilder.add( @@ -359,6 +359,22 @@ public abstract class BaseServerStarter implements ServiceStartable { new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build())); } + @Nullable + private Set<String> getConsumingSegments(String realtimeTableName) { + IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, realtimeTableName); + if (idealState == null || !idealState.isEnabled()) { + return null; + } + Set<String> consumingSegments = new HashSet<>(); + for (String partitionName : idealState.getPartitionSet()) { + if (StateModel.SegmentStateModel.CONSUMING.equals( + idealState.getInstanceStateMap(partitionName).get(_instanceId))) { + consumingSegments.add(partitionName); + } + } + return consumingSegments; + } + private void updateInstanceConfigIfNeeded(ServerConf serverConf) { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, _instanceId); @@ -518,12 +534,13 @@ public abstract class BaseServerStarter implements ServiceStartable { } } - boolean exitServerOnIncompleteStartup = _serverConf.getProperty( - Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE, - Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE); + boolean exitServerOnIncompleteStartup = + _serverConf.getProperty(Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE, + Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE); if (exitServerOnIncompleteStartup) { - String errorMessage = String.format("Service status %s has not turned GOOD within %dms: %s. Exiting server.", - serviceStatus, System.currentTimeMillis() - startTimeMs, ServiceStatus.getStatusDescription()); + String errorMessage = + String.format("Service status %s has not turned GOOD within %dms: %s. Exiting server.", serviceStatus, + System.currentTimeMillis() - startTimeMs, ServiceStatus.getStatusDescription()); throw new IllegalStateException(errorMessage); } LOGGER.warn("Service status has not turned GOOD within {}ms: {}", System.currentTimeMillis() - startTimeMs, @@ -581,8 +598,8 @@ public abstract class BaseServerStarter implements ServiceStartable { InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries); // initialize the thread accountant for query killing - Tracing.ThreadAccountantOps - .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); + Tracing.ThreadAccountantOps.initializeThreadAccountant( + _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java index 6f3610e596..77eac3832e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java @@ -19,7 +19,9 @@ package org.apache.pinot.server.starter.helix; +import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -37,9 +39,10 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum private final long _minFreshnessMs; private final long _idleTimeoutMs; - public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments, + public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, + Map<String, Set<String>> consumingSegments, Function<String, Set<String>> consumingSegmentsSupplier, long minFreshnessMs, long idleTimeoutMs) { - super(instanceDataManager, consumingSegments); + super(instanceDataManager, consumingSegments, consumingSegmentsSupplier); _minFreshnessMs = minFreshnessMs; _idleTimeoutMs = idleTimeoutMs; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java index 83de35a63c..c6fe0d16d6 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java @@ -19,15 +19,16 @@ package org.apache.pinot.server.starter.helix; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; import java.util.Set; -import org.apache.pinot.common.utils.LLCSegmentName; +import java.util.function.Function; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,64 +36,103 @@ import org.slf4j.LoggerFactory; public abstract class IngestionBasedConsumptionStatusChecker { protected final Logger _logger = LoggerFactory.getLogger(getClass()); - // constructor parameters - protected final InstanceDataManager _instanceDataManager; - protected final Set<String> _consumingSegments; - - // helper variable - private final Set<String> _caughtUpSegments = new HashSet<>(); + private final InstanceDataManager _instanceDataManager; + private final Map<String, Set<String>> _consumingSegmentsByTable; + private final Map<String, Set<String>> _caughtUpSegmentsByTable = new HashMap<>(); + private final Function<String, Set<String>> _consumingSegmentsSupplier; + /** + * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided as it can be costly to get + * consumingSegmentsByTable via the supplier, so only use it when any missing segment is detected. + */ public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, - Set<String> consumingSegments) { + Map<String, Set<String>> consumingSegmentsByTable, Function<String, Set<String>> consumingSegmentsSupplier) { _instanceDataManager = instanceDataManager; - _consumingSegments = consumingSegments; + _consumingSegmentsByTable = consumingSegmentsByTable; + _consumingSegmentsSupplier = consumingSegmentsSupplier; } - public int getNumConsumingSegmentsNotReachedIngestionCriteria() { - for (String segName : _consumingSegments) { - if (_caughtUpSegments.contains(segName)) { - continue; - } - TableDataManager tableDataManager = getTableDataManager(segName); + // This might be called by multiple threads, thus synchronized to be correct. + public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() { + // If the checker found any consuming segments are missing or committed for a table, it should reset the set of + // consuming segments for the table to continue to monitor the freshness, otherwise the checker might get stuck + // on deleted segments or tables, or miss new consuming segments created in the table and get ready prematurely. + Set<String> tablesToRefresh = new HashSet<>(); + Iterator<Map.Entry<String, Set<String>>> itr = _consumingSegmentsByTable.entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry<String, Set<String>> tableSegments = itr.next(); + String tableNameWithType = tableSegments.getKey(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager == null) { - _logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName); + _logger.info("No tableDataManager for table: {}. Refresh table's consuming segments", tableNameWithType); + tablesToRefresh.add(tableNameWithType); continue; } - SegmentDataManager segmentDataManager = null; - try { - segmentDataManager = tableDataManager.acquireSegment(segName); - if (segmentDataManager == null) { - _logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", - segName); + Set<String> consumingSegments = tableSegments.getValue(); + Set<String> caughtUpSegments = _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new HashSet<>()); + for (String segName : consumingSegments) { + if (caughtUpSegments.contains(segName)) { continue; } - if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { - // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, - // segment data manager will not be of type RealtimeSegmentDataManager. - _logger.info("Segment {} is already committed and is considered caught up.", segName); - _caughtUpSegments.add(segName); + SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName); + if (segmentDataManager == null) { + _logger.info("No segmentDataManager for segment: {} from table: {}. Refresh table's consuming segments", + segName, tableNameWithType); + tablesToRefresh.add(tableNameWithType); continue; } - - RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; - if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { - _caughtUpSegments.add(segName); - } - } finally { - if (segmentDataManager != null) { + try { + if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { + // It's possible that the consuming segment has been committed by another server. In this case, we should + // get the new consuming segments for the table and continue to monitor their consumption status, until the + // current server catches up the consuming segments. + _logger.info("Segment: {} from table: {} is already committed. Refresh table's consuming segments.", + segName, tableNameWithType); + tablesToRefresh.add(tableNameWithType); + continue; + } + RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { + caughtUpSegments.add(segName); + } + } finally { tableDataManager.releaseSegment(segmentDataManager); } } + int numLaggingSegments = consumingSegments.size() - caughtUpSegments.size(); + if (numLaggingSegments == 0) { + _logger.info("Consuming segments from table: {} have all caught up", tableNameWithType); + itr.remove(); + _caughtUpSegmentsByTable.remove(tableNameWithType); + } + } + if (!tablesToRefresh.isEmpty()) { + for (String tableNameWithType : tablesToRefresh) { + Set<String> updatedConsumingSegments = _consumingSegmentsSupplier.apply(tableNameWithType); + if (updatedConsumingSegments == null || updatedConsumingSegments.isEmpty()) { + _consumingSegmentsByTable.remove(tableNameWithType); + _caughtUpSegmentsByTable.remove(tableNameWithType); + _logger.info("Found no consuming segments from table: {}, which is probably removed", tableNameWithType); + } else { + _consumingSegmentsByTable.put(tableNameWithType, updatedConsumingSegments); + _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new HashSet<>()) + .retainAll(updatedConsumingSegments); + _logger.info( + "Updated consumingSegments: {} and caughtUpSegments: {} for table: {}, as consuming segments were " + + "missing or committed", updatedConsumingSegments, _caughtUpSegmentsByTable.get(tableNameWithType), + tableNameWithType); + } + } } - return _consumingSegments.size() - _caughtUpSegments.size(); + int numLaggingSegments = 0; + for (Map.Entry<String, Set<String>> tableSegments : _consumingSegmentsByTable.entrySet()) { + String tableNameWithType = tableSegments.getKey(); + Set<String> consumingSegments = tableSegments.getValue(); + Set<String> caughtUpSegments = _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new HashSet<>()); + numLaggingSegments += consumingSegments.size() - caughtUpSegments.size(); + } + return numLaggingSegments; } protected abstract boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager); - - private TableDataManager getTableDataManager(String segmentName) { - LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); - String tableName = llcSegmentName.getTableName(); - String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - return _instanceDataManager.getTableDataManager(tableNameWithType); - } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java index 6b597e3fa2..ad7d2905ba 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java @@ -19,7 +19,9 @@ package org.apache.pinot.server.starter.helix; +import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -34,8 +36,9 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; */ public class OffsetBasedConsumptionStatusChecker extends IngestionBasedConsumptionStatusChecker { - public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) { - super(instanceDataManager, consumingSegments); + public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, + Map<String, Set<String>> consumingSegments, Function<String, Set<String>> consumingSegmentsSupplier) { + super(instanceDataManager, consumingSegments, consumingSegmentsSupplier); } @Override diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java new file mode 100644 index 0000000000..ccd8f6f855 --- /dev/null +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + + +class ConsumptionStatusCheckerTestUtils { + private ConsumptionStatusCheckerTestUtils() { + } + + public static Function<String, Set<String>> getConsumingSegments(Map<String, Set<String>> consumingSegments) { + // Create a new Set instance to keep updates separated from the consumingSegments. + return (tableName) -> { + Set<String> updated = consumingSegments.get(tableName); + return updated == null ? null : new HashSet<>(updated); + }; + } +} diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java index 6301b54d04..e619ba7d70 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java @@ -20,7 +20,11 @@ package org.apache.pinot.server.starter.helix; import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; @@ -42,8 +46,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { private final long _now; public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, - Set<String> consumingSegments, long minFreshnessMs, long idleTimeoutMs, long now) { - super(instanceDataManager, consumingSegments, minFreshnessMs, idleTimeoutMs); + Map<String, Set<String>> consumingSegments, Function<String, Set<String>> consumingSegmentsSupplier, + long minFreshnessMs, long idleTimeoutMs, long now) { + super(instanceDataManager, consumingSegments, consumingSegmentsSupplier, minFreshnessMs, idleTimeoutMs); _now = now; } @@ -58,10 +63,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); FreshnessBasedConsumptionStatusChecker statusChecker = - new FreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10000L, 0L); + new FreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10000L, 0L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -119,6 +127,55 @@ public class FreshnessBasedConsumptionStatusCheckerTest { assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); } + @Test + public void testWithDroppedTableAndSegment() + throws InterruptedException { + String segA0 = "tableA__0__0__123Z"; + String segA1 = "tableA__1__0__123Z"; + String segB0 = "tableB__0__0__123Z"; + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new HashSet<>()).add(segA0); + consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new HashSet<>()).add(segA1); + consumingSegments.computeIfAbsent("tableB_REALTIME", k -> new HashSet<>()).add(segB0); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + FreshnessBasedConsumptionStatusChecker statusChecker = + new FreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 0L); + + // TableDataManager is not set up yet + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); + + // setup TableDataMangers + TableDataManager tableDataManagerA = mock(TableDataManager.class); + when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA); + when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null); + + // setup SegmentDataManagers + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); + when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null); + + when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20)); + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0)); + // ensure negative values are ignored + setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE); + + // current offset latest stream offset current time last ingestion time + // segA0 0 20 100 Long.MIN_VALUE + // segA1 (segment is absent) + // segB0 (table is absent) + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); + + // updatedConsumingSegments still provide 3 segments to checker but one has caught up. + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 2); + // Remove the missing segments and check again. + consumingSegments.get("tableA_REALTIME").remove(segA1); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 1); + consumingSegments.remove("tableB_REALTIME"); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); + } + private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager segmentDataManager, long latestIngestionTimestamp) { MutableSegment mockSegment = mock(MutableSegment.class); @@ -133,10 +190,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); FreshnessBasedConsumptionStatusChecker statusChecker = - new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10L, 0L, 100L); + new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 0L, 100L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -195,12 +255,14 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); long idleTimeoutMs = 10L; FreshnessBasedConsumptionStatusChecker statusChecker = - new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10L, idleTimeoutMs, - 100L); + new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, idleTimeoutMs, 100L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -270,10 +332,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); FreshnessBasedConsumptionStatusChecker statusChecker = - new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10L, 0L, 100L); + new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 0L, 100L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -319,10 +384,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); FreshnessBasedConsumptionStatusChecker statusChecker = - new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10L, 0L, 100L); + new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 0L, 100L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -369,6 +437,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest { setupLatestIngestionTimestamp(segMngrA0, 90L); // Unexpected case where latest ingested is somehow after current time setupLatestIngestionTimestamp(segMngrA1, 101L); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 1); + consumingSegments.get("tableB_REALTIME").remove(segB0); assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); } @@ -377,10 +447,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); FreshnessBasedConsumptionStatusChecker statusChecker = - new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, 10L, 0L, 100L); + new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 0L, 100L); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java index 88b05b8ff0..2248f731d2 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java @@ -20,6 +20,8 @@ package org.apache.pinot.server.starter.helix; import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; @@ -41,10 +43,13 @@ public class OffsetBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); OffsetBasedConsumptionStatusChecker statusChecker = - new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments)); // setup TableDataMangers TableDataManager tableDataManagerA = mock(TableDataManager.class); @@ -88,11 +93,14 @@ public class OffsetBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); OffsetBasedConsumptionStatusChecker statusChecker = - new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments)); // TableDataManager is not set up yet assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); @@ -149,10 +157,13 @@ public class OffsetBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); OffsetBasedConsumptionStatusChecker statusChecker = - new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments)); // setup TableDataMangers TableDataManager tableDataManagerA = mock(TableDataManager.class); @@ -190,6 +201,8 @@ public class OffsetBasedConsumptionStatusCheckerTest { // segB0 committed at 1200 1500 when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 1); + consumingSegments.get("tableB_REALTIME").remove(segB0); assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); } @@ -199,10 +212,13 @@ public class OffsetBasedConsumptionStatusCheckerTest { String segA0 = "tableA__0__0__123Z"; String segA1 = "tableA__1__0__123Z"; String segB0 = "tableB__0__0__123Z"; - Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + Map<String, Set<String>> consumingSegments = new HashMap<>(); + consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1)); + consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0)); InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); OffsetBasedConsumptionStatusChecker statusChecker = - new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments, + ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments)); // setup TableDataMangers TableDataManager tableDataManagerA = mock(TableDataManager.class); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org