This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 24e01c1c1b Add Broker Node Level Config: newSegmentExpirationTimeInSeconds (#13686) 24e01c1c1b is described below commit 24e01c1c1b45efba1063b4bc718b2335ec1da6b8 Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Fri Jul 26 22:06:02 2024 +0530 Add Broker Node Level Config: newSegmentExpirationTimeInSeconds (#13686) --- .../instanceselector/BalancedInstanceSelector.java | 5 +++-- .../instanceselector/BaseInstanceSelector.java | 9 ++++++--- .../routing/instanceselector/InstanceSelector.java | 6 +++++- .../instanceselector/InstanceSelectorFactory.java | 11 +++++++---- .../MultiStageReplicaGroupSelector.java | 5 +++-- .../ReplicaGroupInstanceSelector.java | 5 +++-- .../StrictReplicaGroupInstanceSelector.java | 5 +++-- .../instanceselector/InstanceSelectorTest.java | 20 +++++++++++--------- .../org/apache/pinot/spi/utils/CommonConstants.java | 4 ++++ 9 files changed, 45 insertions(+), 25 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java index c827369907..9ffa8367b8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java @@ -50,8 +50,9 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { public BalancedInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, - boolean useFixedReplica) { - super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica); + boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) { + super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica, + newSegmentExpirationTimeInSeconds); } @Override diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index 3cef77fac4..4849a60fa0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -90,6 +90,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { final AdaptiveServerSelector _adaptiveServerSelector; final Clock _clock; final boolean _useFixedReplica; + final long _newSegmentExpirationTimeInSeconds; final int _tableNameHashForFixedReplicaRouting; // These 3 variables are the cached states to help accelerate the change processing @@ -104,13 +105,14 @@ abstract class BaseInstanceSelector implements InstanceSelector { BaseInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, - boolean useFixedReplica) { + boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) { _tableNameWithType = tableNameWithType; _propertyStore = propertyStore; _brokerMetrics = brokerMetrics; _adaptiveServerSelector = adaptiveServerSelector; _clock = clock; _useFixedReplica = useFixedReplica; + _newSegmentExpirationTimeInSeconds = newSegmentExpirationTimeInSeconds; // Using raw table name to ensure queries spanning across REALTIME and OFFLINE tables are routed to the same // instance // Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF to get a positive value @@ -170,7 +172,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { } SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record); long creationTimeMs = SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata); - if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) { + if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs, _newSegmentExpirationTimeInSeconds * 1000)) { newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(), creationTimeMs); } } @@ -400,7 +402,8 @@ abstract class BaseInstanceSelector implements InstanceSelector { long creationTimeMs = 0; if (newSegmentState != null) { // It was a new segment before, check the creation time and segment state to see if it is still a new segment - if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), currentTimeMs)) { + if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), currentTimeMs, + _newSegmentExpirationTimeInSeconds * 1000)) { creationTimeMs = newSegmentState.getCreationTimeMs(); } } else if (!_oldSegmentCandidatesMap.containsKey(segment)) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java index d003723c5b..b5fe944bce 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java @@ -36,7 +36,11 @@ public interface InstanceSelector { long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5); static boolean isNewSegment(long creationTimeMs, long currentTimeMs) { - return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= NEW_SEGMENT_EXPIRATION_MILLIS; + return isNewSegment(creationTimeMs, currentTimeMs, NEW_SEGMENT_EXPIRATION_MILLIS); + } + + static boolean isNewSegment(long creationTimeMs, long currentTimeMs, long newSegmentExpirationMillis) { + return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= newSegmentExpirationMillis; } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java index 2ccb1a9ac1..895731f1dd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java @@ -63,6 +63,9 @@ public class InstanceSelectorFactory { RoutingConfig routingConfig = tableConfig.getRoutingConfig(); boolean useFixedReplica = brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA, CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA); + long newSegmentExpirationTimeInSeconds = + brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS, + CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS); if (routingConfig != null) { if (routingConfig.getUseFixedReplica() != null) { // table config overrides broker config @@ -74,22 +77,22 @@ public class InstanceSelectorFactory { && LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName()))) { LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", tableNameWithType); return new ReplicaGroupInstanceSelector(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, - clock, useFixedReplica); + clock, useFixedReplica, newSegmentExpirationTimeInSeconds); } if (RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( routingConfig.getInstanceSelectorType())) { LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", tableNameWithType); return new StrictReplicaGroupInstanceSelector(tableNameWithType, propertyStore, brokerMetrics, - adaptiveServerSelector, clock, useFixedReplica); + adaptiveServerSelector, clock, useFixedReplica, newSegmentExpirationTimeInSeconds); } if (RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase( routingConfig.getInstanceSelectorType())) { LOGGER.info("Using {} for table: {}", routingConfig.getInstanceSelectorType(), tableNameWithType); return new MultiStageReplicaGroupSelector(tableNameWithType, propertyStore, brokerMetrics, - adaptiveServerSelector, clock, useFixedReplica); + adaptiveServerSelector, clock, useFixedReplica, newSegmentExpirationTimeInSeconds); } } return new BalancedInstanceSelector(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, - useFixedReplica); + useFixedReplica, newSegmentExpirationTimeInSeconds); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java index b27450426d..22dfc8096a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java @@ -59,8 +59,9 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { public MultiStageReplicaGroupSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, - boolean useFixedReplica) { - super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica); + boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) { + super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica, + newSegmentExpirationTimeInSeconds); } @Override diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java index 0e9bd52d42..c766791f2d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java @@ -65,8 +65,9 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { public ReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, - boolean useFixedReplica) { - super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica); + boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) { + super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica, + newSegmentExpirationTimeInSeconds); } @Override diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java index 95e83ea31c..a8fae76ef1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java @@ -72,8 +72,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele public StrictReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, - boolean useFixedReplica) { - super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica); + boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) { + super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica, + newSegmentExpirationTimeInSeconds); } /** diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index f4edc28e67..cc4f355369 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -234,13 +234,14 @@ public class InstanceSelectorTest { ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); BalancedInstanceSelector balancedInstanceSelector = - new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), false); + new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), + false, 300); ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector = new StrictReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); Set<String> enabledInstances = new HashSet<>(); IdealState idealState = new IdealState(offlineTableName); @@ -761,7 +762,7 @@ public class InstanceSelectorTest { ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); Set<String> enabledInstances = new HashSet<>(); IdealState idealState = new IdealState(offlineTableName); @@ -844,7 +845,7 @@ public class InstanceSelectorTest { ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); Set<String> enabledInstances = new HashSet<>(); IdealState idealState = new IdealState(offlineTableName); @@ -927,7 +928,7 @@ public class InstanceSelectorTest { ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); Set<String> enabledInstances = new HashSet<>(); IdealState idealState = new IdealState(offlineTableName); @@ -1001,7 +1002,7 @@ public class InstanceSelectorTest { MultiStageReplicaGroupSelector multiStageSelector = new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); multiStageSelector = spy(multiStageSelector); doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions(); @@ -1096,11 +1097,12 @@ public class InstanceSelectorTest { ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); BalancedInstanceSelector balancedInstanceSelector = - new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), false); + new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), + false, 300); // ReplicaGroupInstanceSelector has the same behavior as BalancedInstanceSelector for the unavailable segments StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector = new StrictReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false); + false, 300); Set<String> enabledInstances = new HashSet<>(); IdealState idealState = new IdealState(offlineTableName); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3753c0e64b..b6fec05b31 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.math.BigDecimal; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.config.instance.InstanceType; @@ -344,6 +345,9 @@ public class CommonConstants { // precedence over "query.response.size" (i.e., "query.response.size" will be ignored). public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes"; + public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = "pinot.broker.new.segment.expiration.seconds"; + public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = TimeUnit.MINUTES.toSeconds(5); + public static class Request { public static final String SQL = "sql"; public static final String TRACE = "trace"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org