This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e01c1c1b Add Broker Node Level Config: 
newSegmentExpirationTimeInSeconds (#13686)
24e01c1c1b is described below

commit 24e01c1c1b45efba1063b4bc718b2335ec1da6b8
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Fri Jul 26 22:06:02 2024 +0530

    Add Broker Node Level Config: newSegmentExpirationTimeInSeconds (#13686)
---
 .../instanceselector/BalancedInstanceSelector.java   |  5 +++--
 .../instanceselector/BaseInstanceSelector.java       |  9 ++++++---
 .../routing/instanceselector/InstanceSelector.java   |  6 +++++-
 .../instanceselector/InstanceSelectorFactory.java    | 11 +++++++----
 .../MultiStageReplicaGroupSelector.java              |  5 +++--
 .../ReplicaGroupInstanceSelector.java                |  5 +++--
 .../StrictReplicaGroupInstanceSelector.java          |  5 +++--
 .../instanceselector/InstanceSelectorTest.java       | 20 +++++++++++---------
 .../org/apache/pinot/spi/utils/CommonConstants.java  |  4 ++++
 9 files changed, 45 insertions(+), 25 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index c827369907..9ffa8367b8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -50,8 +50,9 @@ public class BalancedInstanceSelector extends 
BaseInstanceSelector {
 
   public BalancedInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      boolean useFixedReplica) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
+      boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica,
+        newSegmentExpirationTimeInSeconds);
   }
 
   @Override
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 3cef77fac4..4849a60fa0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -90,6 +90,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   final AdaptiveServerSelector _adaptiveServerSelector;
   final Clock _clock;
   final boolean _useFixedReplica;
+  final long _newSegmentExpirationTimeInSeconds;
   final int _tableNameHashForFixedReplicaRouting;
 
   // These 3 variables are the cached states to help accelerate the change 
processing
@@ -104,13 +105,14 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
 
   BaseInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      boolean useFixedReplica) {
+      boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
     _tableNameWithType = tableNameWithType;
     _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
     _clock = clock;
     _useFixedReplica = useFixedReplica;
+    _newSegmentExpirationTimeInSeconds = newSegmentExpirationTimeInSeconds;
     // Using raw table name to ensure queries spanning across REALTIME and 
OFFLINE tables are routed to the same
     // instance
     // Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF 
to get a positive value
@@ -170,7 +172,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
       }
       SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record);
       long creationTimeMs = 
SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata);
-      if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) {
+      if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs, 
_newSegmentExpirationTimeInSeconds * 1000)) {
         newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(), 
creationTimeMs);
       }
     }
@@ -400,7 +402,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
       long creationTimeMs = 0;
       if (newSegmentState != null) {
         // It was a new segment before, check the creation time and segment 
state to see if it is still a new segment
-        if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), 
currentTimeMs)) {
+        if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), 
currentTimeMs,
+            _newSegmentExpirationTimeInSeconds * 1000)) {
           creationTimeMs = newSegmentState.getCreationTimeMs();
         }
       } else if (!_oldSegmentCandidatesMap.containsKey(segment)) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index d003723c5b..b5fe944bce 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -36,7 +36,11 @@ public interface InstanceSelector {
   long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);
 
   static boolean isNewSegment(long creationTimeMs, long currentTimeMs) {
-    return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= 
NEW_SEGMENT_EXPIRATION_MILLIS;
+    return isNewSegment(creationTimeMs, currentTimeMs, 
NEW_SEGMENT_EXPIRATION_MILLIS);
+  }
+
+  static boolean isNewSegment(long creationTimeMs, long currentTimeMs, long 
newSegmentExpirationMillis) {
+    return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= 
newSegmentExpirationMillis;
   }
 
   /**
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 2ccb1a9ac1..895731f1dd 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -63,6 +63,9 @@ public class InstanceSelectorFactory {
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
     boolean useFixedReplica = 
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
         CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA);
+    long newSegmentExpirationTimeInSeconds =
+        
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS,
+        
CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS);
     if (routingConfig != null) {
       if (routingConfig.getUseFixedReplica() != null) {
         // table config overrides broker config
@@ -74,22 +77,22 @@ public class InstanceSelectorFactory {
           && 
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
 {
         LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
         return new ReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics, adaptiveServerSelector,
-            clock, useFixedReplica);
+            clock, useFixedReplica, newSegmentExpirationTimeInSeconds);
       }
       if 
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
           routingConfig.getInstanceSelectorType())) {
         LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
         return new StrictReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock, useFixedReplica);
+            adaptiveServerSelector, clock, useFixedReplica, 
newSegmentExpirationTimeInSeconds);
       }
       if 
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
           routingConfig.getInstanceSelectorType())) {
         LOGGER.info("Using {} for table: {}", 
routingConfig.getInstanceSelectorType(), tableNameWithType);
         return new MultiStageReplicaGroupSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock, useFixedReplica);
+            adaptiveServerSelector, clock, useFixedReplica, 
newSegmentExpirationTimeInSeconds);
       }
     }
     return new BalancedInstanceSelector(tableNameWithType, propertyStore, 
brokerMetrics, adaptiveServerSelector, clock,
-        useFixedReplica);
+        useFixedReplica, newSegmentExpirationTimeInSeconds);
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index b27450426d..22dfc8096a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -59,8 +59,9 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
 
   public MultiStageReplicaGroupSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      boolean useFixedReplica) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
+      boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica,
+        newSegmentExpirationTimeInSeconds);
   }
 
   @Override
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 0e9bd52d42..c766791f2d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -65,8 +65,9 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
 
   public ReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      boolean useFixedReplica) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
+      boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica,
+        newSegmentExpirationTimeInSeconds);
   }
 
   @Override
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 95e83ea31c..a8fae76ef1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -72,8 +72,9 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
 
   public StrictReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      boolean useFixedReplica) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
+      boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica,
+        newSegmentExpirationTimeInSeconds);
   }
 
   /**
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index f4edc28e67..cc4f355369 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -234,13 +234,14 @@ public class InstanceSelectorTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(), false);
+        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false, 300);
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
     StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
         new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -761,7 +762,7 @@ public class InstanceSelectorTest {
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -844,7 +845,7 @@ public class InstanceSelectorTest {
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -927,7 +928,7 @@ public class InstanceSelectorTest {
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -1001,7 +1002,7 @@ public class InstanceSelectorTest {
 
     MultiStageReplicaGroupSelector multiStageSelector =
         new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
     multiStageSelector = spy(multiStageSelector);
     
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
 
@@ -1096,11 +1097,12 @@ public class InstanceSelectorTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(), false);
+        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false, 300);
     // ReplicaGroupInstanceSelector has the same behavior as 
BalancedInstanceSelector for the unavailable segments
     StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
         new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC(),
-            false);
+            false, 300);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3753c0e64b..b6fec05b31 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.config.instance.InstanceType;
 
 
@@ -344,6 +345,9 @@ public class CommonConstants {
     // precedence over "query.response.size" (i.e., "query.response.size" will 
be ignored).
     public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = 
"pinot.broker.max.server.response.size.bytes";
 
+    public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = 
"pinot.broker.new.segment.expiration.seconds";
+    public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = 
TimeUnit.MINUTES.toSeconds(5);
+
     public static class Request {
       public static final String SQL = "sql";
       public static final String TRACE = "trace";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to