This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch llc_routing_hotfix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 77954eccad9e4e5086c7edaa41235ef823a91d1a
Author: Jackie (Xiaotian) Jiang <xaji...@linkedin.com>
AuthorDate: Tue Jun 18 18:34:55 2019 -0700

    Within a partition, only allow querying the first CONSUMING segment for the 
real-time table routing
    
    Within a partition, we only allow querying the first CONSUMING segment 
(segment with instances in CONSUMING
    state) for the following reasons:
    - If within a partition, there are multiple CONSUMING segments (typically 
caused by the delay of CONSUMING to
      ONLINE state transition), we can only query the first CONSUMING segment 
because it might contain records that
      overlapped with the records in the next segment (over-consumed).
    - If the instance states for a segment is partial ONLINE and partial 
CONSUMING (some instances finished the
      CONSUMING to ONLINE state transition, others didn't), we count the 
segment as CONSUMING segment (most likely
      the first CONSUMING segment because it is already committed and is 
performing the CONSUMING to ONLINE state
      transition). If we don't count the segment as CONSUMING segment, then 
this segment is not allowed to be in the
      CONSUMING state for routing purpose, and we will route all queries to the 
ONLINE instances which can
      potentially overwhelm instance.
    - It is possible that the latest CONSUMING segment is not allowed for 
routing purpose and we won't query it, but
      it should only last for a short period of time. Once the older CONSUMING 
segment becomes ONLINE (all instances
      finished the CONSUMING to ONLINE state transition), the latest CONSUMING 
segment will become the first
      CONSUMING segment and will be allowed for routing purpose.
---
 .../LowLevelConsumerRoutingTableBuilder.java       |  6 +-
 .../builder/LowLevelRoutingTableBuilderUtil.java   | 76 ++++++++++------------
 2 files changed, 36 insertions(+), 46 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
index f6c40b2..4a39da2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
@@ -122,9 +122,9 @@ public class LowLevelConsumerRoutingTableBuilder extends 
GeneratorBasedRoutingTa
             continue;
           }
 
-          // Replicas in CONSUMING state are only allowed on the last segment
-          if 
(state.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
-              && segmentName.equals(validConsumingSegment)) {
+          // If the server is in CONSUMING status, the segment has to be match 
with the valid consuming segment
+          if 
(state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
+              && validConsumingSegment != null && 
segmentNameStr.equals(validConsumingSegment.getSegmentName())) {
             validServers.add(instance);
           }
         }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
index 8490397..3ccf841 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import org.apache.helix.model.ExternalView;
-import org.apache.pinot.common.utils.CommonConstants;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.SegmentName;
 
 
@@ -32,7 +32,30 @@ import org.apache.pinot.common.utils.SegmentName;
 public class LowLevelRoutingTableBuilderUtil {
 
   /**
-   * Compute the map of allowed 'consuming' segments for each partition.
+   * Compute the map of allowed CONSUMING segments for each partition for 
routing purpose.
+   * <p>Within a partition, we only allow querying the first CONSUMING segment 
(segment with instances in CONSUMING
+   * state) for the following reasons:
+   * <ul>
+   *   <li>
+   *     If within a partition, there are multiple CONSUMING segments 
(typically caused by the delay of CONSUMING to
+   *     ONLINE state transition), we can only query the first CONSUMING 
segment because it might contain records that
+   *     overlapped with the records in the next segment (over-consumed).
+   *   </li>
+   *   <li>
+   *     If the instance states for a segment is partial ONLINE and partial 
CONSUMING (some instances finished the
+   *     CONSUMING to ONLINE state transition, others didn't), we count the 
segment as CONSUMING segment (most likely
+   *     the first CONSUMING segment because it is already committed and is 
performing the CONSUMING to ONLINE state
+   *     transition). If we don't count the segment as CONSUMING segment, then 
this segment is not allowed to be in the
+   *     CONSUMING state for routing purpose, and we will route all queries to 
the ONLINE instances which can
+   *     potentially overwhelm instance.
+   *   </li>
+   *   <li>
+   *     It is possible that the latest CONSUMING segment is not allowed for 
routing purpose and we won't query it, but
+   *     it should only last for a short period of time. Once the older 
CONSUMING segment becomes ONLINE (all instances
+   *     finished the CONSUMING to ONLINE state transition), the latest 
CONSUMING segment will become the first
+   *     CONSUMING segment and will be allowed for routing purpose.
+   *   </li>
+   * </ul>
    *
    * @param externalView helix external view
    * @param sortedSegmentsByPartition map of partition to sorted set of 
segment names.
@@ -40,51 +63,18 @@ public class LowLevelRoutingTableBuilderUtil {
    */
   public static Map<String, SegmentName> 
getAllowedConsumingStateSegments(ExternalView externalView,
       Map<String, SortedSet<SegmentName>> sortedSegmentsByPartition) {
-    Map<String, SegmentName> allowedSegmentInConsumingStateByPartition = new 
HashMap<>();
-    for (String partition : sortedSegmentsByPartition.keySet()) {
-      SortedSet<SegmentName> sortedSegmentsForPartition = 
sortedSegmentsByPartition.get(partition);
-      SegmentName lastAllowedSegmentInConsumingState = null;
-
+    Map<String, SegmentName> allowedConsumingSegments = new HashMap<>();
+    for (Map.Entry<String, SortedSet<SegmentName>> entry : 
sortedSegmentsByPartition.entrySet()) {
+      String partitionId = entry.getKey();
+      SortedSet<SegmentName> sortedSegmentsForPartition = entry.getValue();
       for (SegmentName segmentName : sortedSegmentsForPartition) {
-        Map<String, String> helixPartitionState = 
externalView.getStateMap(segmentName.getSegmentName());
-        boolean allInConsumingState = true;
-        int replicasInConsumingState = 0;
-
-        // Only keep the segment if all replicas have it in CONSUMING state
-        for (String externalViewState : helixPartitionState.values()) {
-          // Ignore ERROR state
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR))
 {
-            continue;
-          }
-
-          // Not all segments are in CONSUMING state, therefore don't consider 
the last segment assignable to CONSUMING
-          // replicas
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE))
 {
-            allInConsumingState = false;
-            break;
-          }
-
-          // Otherwise count the replica as being in CONSUMING state
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
 {
-            replicasInConsumingState++;
-          }
-        }
-
-        // If all replicas have this segment in consuming state (and not all 
of them are in ERROR state), then pick this
-        // segment to be the last allowed segment to be in CONSUMING state
-        if (allInConsumingState && 0 < replicasInConsumingState) {
-          lastAllowedSegmentInConsumingState = segmentName;
+        if (externalView.getStateMap(segmentName.getSegmentName())
+            .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+          allowedConsumingSegments.put(partitionId, segmentName);
           break;
         }
       }
-
-      if (lastAllowedSegmentInConsumingState != null) {
-        allowedSegmentInConsumingStateByPartition.put(partition, 
lastAllowedSegmentInConsumingState);
-      }
     }
-    return allowedSegmentInConsumingStateByPartition;
+    return allowedConsumingSegments;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to