Jackie-Jiang commented on a change in pull request #7709:
URL: https://github.com/apache/pinot/pull/7709#discussion_r744013132



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -71,218 +113,331 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   private void fillResultArray(ProjectionBlock projectionBlock) {
-    if (_results == null) {
-      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-    }
     int length = projectionBlock.getNumDocs();
+    if (_results == null || _results.length < length) {
+      _results = new int[length];
+    }
     switch (_leftStoredType) {
       case INT:
-        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftIntValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultInt(projectionBlock, length);
         break;
       case LONG:
-        long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightFloatValues[i])));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightDoubleValues[i])));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftLongValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultLong(projectionBlock, length);
         break;
       case FLOAT:
-        float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftFloatValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Float.compare(leftFloatValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftFloatValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultFloat(projectionBlock, length);
         break;
       case DOUBLE:
-        double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftDoubleValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftDoubleValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultDouble(projectionBlock, length);
         break;
       case STRING:
-        String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
-        String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
-        }
+        fillResultString(projectionBlock, length);
         break;
       case BYTES:
-        byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
-        byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
-        }
+        fillResultBytes(projectionBlock, length);
         break;
       // NOTE: Multi-value columns are not comparable, so we should not reach 
here
       default:
-        throw new IllegalStateException();
+        throw illegalState();
+    }
+  }
+
+  private void fillResultInt(ProjectionBlock projectionBlock, int length) {
+    int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftIntValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultLong(ProjectionBlock projectionBlock, int length) {
+    long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftLongValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultFloat(ProjectionBlock projectionBlock, int length) {
+    float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultDouble(ProjectionBlock projectionBlock, int length) {
+    double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private IllegalStateException illegalState() {
+    throw new IllegalStateException(String.format(
+        "Unsupported data type for comparison: [Left Transform Function [%s] 
result type is [%s], Right "
+            + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
+        _rightTransformFunction.getName(), _rightStoredType));
+  }
+
+  private void fillResultString(ProjectionBlock projectionBlock, int length) {
+    String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
+    }
+  }
+
+  private void fillResultBytes(ProjectionBlock projectionBlock, int length) {
+    byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+    byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, int[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, long[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, float[] 
leftValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, float[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(BigDecimal.valueOf(rightValues[i])));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, float[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Float.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, float[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, float[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, double[] 
leftValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, double[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(BigDecimal.valueOf(rightValues[i])));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, double[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, double[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, double[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
     }
   }
 
   private int getIntResult(int comparisonResult) {
     return getBinaryFuncResult(comparisonResult) ? 1 : 0;
   }
 
-  protected abstract boolean getBinaryFuncResult(int comparisonResult);
+  private boolean getBinaryFuncResult(int comparisonResult) {
+    switch (_op) {
+      case EQUALS:
+        return comparisonResult == 0;
+      case GREATER_THAN_OR_EQUAL:
+        return comparisonResult >= 0;
+      case GREATER_THAN:
+        return comparisonResult > 0;
+      case LESS_THAN:
+        return comparisonResult < 0;
+      case LESS_THAN_OR_EQUAL:
+        return comparisonResult <= 0;
+      case NOT_EQUAL:
+        return comparisonResult != 0;
+      default:
+        assert false;

Review comment:
       Throw exception?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -71,218 +113,331 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   private void fillResultArray(ProjectionBlock projectionBlock) {
-    if (_results == null) {
-      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-    }
     int length = projectionBlock.getNumDocs();
+    if (_results == null || _results.length < length) {
+      _results = new int[length];
+    }
     switch (_leftStoredType) {
       case INT:
-        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftIntValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultInt(projectionBlock, length);
         break;
       case LONG:
-        long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightFloatValues[i])));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightDoubleValues[i])));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftLongValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultLong(projectionBlock, length);
         break;
       case FLOAT:
-        float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftFloatValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Float.compare(leftFloatValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftFloatValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultFloat(projectionBlock, length);
         break;
       case DOUBLE:
-        double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftDoubleValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftDoubleValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultDouble(projectionBlock, length);
         break;
       case STRING:
-        String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
-        String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
-        }
+        fillResultString(projectionBlock, length);
         break;
       case BYTES:
-        byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
-        byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
-        }
+        fillResultBytes(projectionBlock, length);
         break;
       // NOTE: Multi-value columns are not comparable, so we should not reach 
here
       default:
-        throw new IllegalStateException();
+        throw illegalState();
+    }
+  }
+
+  private void fillResultInt(ProjectionBlock projectionBlock, int length) {
+    int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftIntValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultLong(ProjectionBlock projectionBlock, int length) {
+    long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftLongValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultFloat(ProjectionBlock projectionBlock, int length) {
+    float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultDouble(ProjectionBlock projectionBlock, int length) {
+    double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private IllegalStateException illegalState() {
+    throw new IllegalStateException(String.format(
+        "Unsupported data type for comparison: [Left Transform Function [%s] 
result type is [%s], Right "
+            + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
+        _rightTransformFunction.getName(), _rightStoredType));
+  }
+
+  private void fillResultString(ProjectionBlock projectionBlock, int length) {
+    String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
+    }
+  }
+
+  private void fillResultBytes(ProjectionBlock projectionBlock, int length) {
+    byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+    byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));

Review comment:
       Avoid creating the object
   ```suggestion
         _results[i] = getIntResult(ByteArray.compare(leftBytesValues[i], 
rightBytesValues[i]));
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -71,218 +113,331 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   private void fillResultArray(ProjectionBlock projectionBlock) {
-    if (_results == null) {
-      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-    }
     int length = projectionBlock.getNumDocs();
+    if (_results == null || _results.length < length) {
+      _results = new int[length];
+    }
     switch (_leftStoredType) {
       case INT:
-        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftIntValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultInt(projectionBlock, length);
         break;
       case LONG:
-        long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightFloatValues[i])));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightDoubleValues[i])));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftLongValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultLong(projectionBlock, length);
         break;
       case FLOAT:
-        float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftFloatValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Float.compare(leftFloatValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftFloatValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultFloat(projectionBlock, length);
         break;
       case DOUBLE:
-        double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftDoubleValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftDoubleValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultDouble(projectionBlock, length);
         break;
       case STRING:
-        String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
-        String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
-        }
+        fillResultString(projectionBlock, length);
         break;
       case BYTES:
-        byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
-        byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
-        }
+        fillResultBytes(projectionBlock, length);
         break;
       // NOTE: Multi-value columns are not comparable, so we should not reach 
here
       default:
-        throw new IllegalStateException();
+        throw illegalState();
+    }
+  }
+
+  private void fillResultInt(ProjectionBlock projectionBlock, int length) {
+    int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftIntValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultLong(ProjectionBlock projectionBlock, int length) {
+    long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftLongValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultFloat(ProjectionBlock projectionBlock, int length) {
+    float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultDouble(ProjectionBlock projectionBlock, int length) {
+    double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private IllegalStateException illegalState() {
+    throw new IllegalStateException(String.format(
+        "Unsupported data type for comparison: [Left Transform Function [%s] 
result type is [%s], Right "
+            + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
+        _rightTransformFunction.getName(), _rightStoredType));
+  }
+
+  private void fillResultString(ProjectionBlock projectionBlock, int length) {
+    String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
+    }
+  }
+
+  private void fillResultBytes(ProjectionBlock projectionBlock, int length) {
+    byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+    byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, int[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, long[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));

Review comment:
       Same here

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -71,218 +113,331 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   private void fillResultArray(ProjectionBlock projectionBlock) {
-    if (_results == null) {
-      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-    }
     int length = projectionBlock.getNumDocs();
+    if (_results == null || _results.length < length) {
+      _results = new int[length];
+    }
     switch (_leftStoredType) {
       case INT:
-        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftIntValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftIntValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultInt(projectionBlock, length);
         break;
       case LONG:
-        long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Long.compare(leftLongValues[i], 
rightLongValues[i]));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightFloatValues[i])));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftLongValues[i]).compareTo(BigDecimal.valueOf(rightDoubleValues[i])));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] =
-                    
getIntResult(BigDecimal.valueOf(leftLongValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultLong(projectionBlock, length);
         break;
       case FLOAT:
-        float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftFloatValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Float.compare(leftFloatValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftFloatValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftFloatValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultFloat(projectionBlock, length);
         break;
       case DOUBLE:
-        double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
-        switch (_rightStoredType) {
-          case INT:
-            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightIntValues[i]));
-            }
-            break;
-          case LONG:
-            long[] rightLongValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(
-                  
BigDecimal.valueOf(leftDoubleValues[i]).compareTo(BigDecimal.valueOf(rightLongValues[i])));
-            }
-            break;
-          case FLOAT:
-            float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightFloatValues[i]));
-            }
-            break;
-          case DOUBLE:
-            double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              _results[i] = getIntResult(Double.compare(leftDoubleValues[i], 
rightDoubleValues[i]));
-            }
-            break;
-          case STRING:
-            String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < length; i++) {
-              try {
-                _results[i] = getIntResult(
-                    BigDecimal.valueOf(leftDoubleValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
-              } catch (NumberFormatException e) {
-                _results[i] = 0;
-              }
-            }
-            break;
-          default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type for comparison: [Left Transform 
Function [%s] result type is [%s], Right "
-                    + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
-                _rightTransformFunction.getName(), _rightStoredType));
-        }
+        fillResultDouble(projectionBlock, length);
         break;
       case STRING:
-        String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
-        String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
-        }
+        fillResultString(projectionBlock, length);
         break;
       case BYTES:
-        byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
-        byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
-        for (int i = 0; i < length; i++) {
-          _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
-        }
+        fillResultBytes(projectionBlock, length);
         break;
       // NOTE: Multi-value columns are not comparable, so we should not reach 
here
       default:
-        throw new IllegalStateException();
+        throw illegalState();
+    }
+  }
+
+  private void fillResultInt(ProjectionBlock projectionBlock, int length) {
+    int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftIntValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftIntValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultLong(ProjectionBlock projectionBlock, int length) {
+    long[] leftLongValues = 
_leftTransformFunction.transformToLongValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftLongValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftLongValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultFloat(ProjectionBlock projectionBlock, int length) {
+    float[] leftFloatValues = 
_leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftFloatValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private void fillResultDouble(ProjectionBlock projectionBlock, int length) {
+    double[] leftDoubleValues = 
_leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    switch (_rightStoredType) {
+      case INT:
+        fillIntResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case LONG:
+        fillLongResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case FLOAT:
+        fillFloatResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case DOUBLE:
+        fillDoubleResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      case STRING:
+        fillStringResultArray(projectionBlock, leftDoubleValues, length);
+        break;
+      default:
+        throw illegalState();
+    }
+  }
+
+  private IllegalStateException illegalState() {
+    throw new IllegalStateException(String.format(
+        "Unsupported data type for comparison: [Left Transform Function [%s] 
result type is [%s], Right "
+            + "Transform Function [%s] result type is [%s]]", 
_leftTransformFunction.getName(), _leftStoredType,
+        _rightTransformFunction.getName(), _rightStoredType));
+  }
+
+  private void fillResultString(ProjectionBlock projectionBlock, int length) {
+    String[] leftStringValues = 
_leftTransformFunction.transformToStringValuesSV(projectionBlock);
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = 
getIntResult(leftStringValues[i].compareTo(rightStringValues[i]));
+    }
+  }
+
+  private void fillResultBytes(ProjectionBlock projectionBlock, int length) {
+    byte[][] leftBytesValues = 
_leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+    byte[][] rightBytesValues = 
_rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult((new 
ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, int[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Integer.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));
+    }
+  }
+
+  private void fillDoubleResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    double[] rightDoubleValues = 
_rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightDoubleValues[i]));
+    }
+  }
+
+  private void fillStringResultArray(ProjectionBlock projectionBlock, int[] 
leftValues, int length) {
+    String[] rightStringValues = 
_rightTransformFunction.transformToStringValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      try {
+        _results[i] =
+            getIntResult(BigDecimal.valueOf(leftValues[i]).compareTo(new 
BigDecimal(rightStringValues[i])));
+      } catch (NumberFormatException e) {
+        _results[i] = 0;
+      }
+    }
+  }
+
+  private void fillIntResultArray(ProjectionBlock projectionBlock, long[] 
leftIntValues, int length) {
+    int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftIntValues[i], 
rightIntValues[i]));
+    }
+  }
+
+  private void fillLongResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    long[] rightValues = 
_rightTransformFunction.transformToLongValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Long.compare(leftValues[i], rightValues[i]));
+    }
+  }
+
+  private void fillFloatResultArray(ProjectionBlock projectionBlock, long[] 
leftValues, int length) {
+    float[] rightFloatValues = 
_rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+    for (int i = 0; i < length; i++) {
+      _results[i] = getIntResult(Double.compare(leftValues[i], 
rightFloatValues[i]));

Review comment:
       We should use `BigDecimal` for this comparison




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to