Jackie-Jiang commented on code in PR #8927:
URL: https://github.com/apache/pinot/pull/8927#discussion_r911061582


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java:
##########
@@ -40,12 +44,40 @@
 public class TransformBlockValSet implements BlockValSet {
   private final ProjectionBlock _projectionBlock;
   private final TransformFunction _transformFunction;
+  private final RoaringBitmap _nullBitmap;
 
   private int[] _numMVEntries;
 
-  public TransformBlockValSet(ProjectionBlock projectionBlock, 
TransformFunction transformFunction) {
+  public TransformBlockValSet(ProjectionBlock projectionBlock, 
TransformFunction transformFunction,
+      ExpressionContext expression) {
     _projectionBlock = projectionBlock;
     _transformFunction = transformFunction;
+    RoaringBitmap nullBitmap = null;

Review Comment:
   I feel this logic should be performed in the operator instead of in the 
value set. We need to compute the null bitmap only when the null handling is 
enabled



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -311,16 +337,51 @@ private DataTable getResultDataTable()
       throws IOException {
     DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
+    int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      dataTableBuilder.startRow();
-      int columnIndex = 0;
-      for (Object value : record.getValues()) {
-        setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
-        columnIndex++;
+    RoaringBitmap[] nullBitmaps = null;
+    if (_isNullHandlingEnabled) {
+      nullBitmaps = new RoaringBitmap[numColumns];
+      Object[] colDefaultNullValues = new Object[numColumns];
+      for (int colId = 0; colId < numColumns; colId++) {
+        if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
+          // Store a dummy value that is both a valid numeric, and a valid hex 
encoded value.
+          String specialVal = "30";
+          colDefaultNullValues[colId] = 
storedColumnDataTypes[colId].toDataType().convert(specialVal);
+        }
+        nullBitmaps[colId] = new RoaringBitmap();
+      }
+      int rowId = 0;
+      while (iterator.hasNext()) {
+        Object[] values = iterator.next().getValues();
+        dataTableBuilder.startRow();
+        for (int columnIndex = 0; columnIndex < values.length; columnIndex++) {
+          Object value = values[columnIndex];
+          if (value == null) {
+            value = colDefaultNullValues[columnIndex];
+            nullBitmaps[columnIndex].add(rowId);
+          }
+          setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
+        }
+        dataTableBuilder.finishRow();
+        rowId++;
+      }
+    } else {
+      while (iterator.hasNext()) {
+        Record record = iterator.next();
+        dataTableBuilder.startRow();
+        int columnIndex = 0;
+        for (Object value : record.getValues()) {
+          setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
+          columnIndex++;
+        }
+        dataTableBuilder.finishRow();
+      }
+    }
+    if (_isNullHandlingEnabled && DataTableFactory.getDataTableVersion() >= 
DataTableFactory.VERSION_4) {

Review Comment:
   This check should be performed much earlier, probably when the server just 
gets the query with null handling enabled



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -390,28 +452,57 @@ private DataTable getAggregationResultDataTable()
       columnNames[i] = aggregationFunction.getColumnName();
       columnDataTypes[i] = 
aggregationFunction.getIntermediateResultColumnType();
     }
+    RoaringBitmap[] nullBitmaps = null;
+    Object[] colDefaultNullValues = null;
+    if (_isNullHandlingEnabled) {
+      colDefaultNullValues = new Object[numAggregationFunctions];
+      nullBitmaps = new RoaringBitmap[numAggregationFunctions];
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        if (columnDataTypes[i] != ColumnDataType.OBJECT) {
+          // Store a dummy value that is both a valid numeric, and a valid hex 
encoded value.
+          String specialVal = "30";
+          colDefaultNullValues[i] = 
columnDataTypes[i].toDataType().convert(specialVal);
+        }
+        nullBitmaps[i] = new RoaringBitmap();
+      }
+    }
 
     // Build the data table.
     DataTableBuilder dataTableBuilder =
         DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, 
columnDataTypes));
     dataTableBuilder.startRow();
     for (int i = 0; i < numAggregationFunctions; i++) {
+      Object value = _aggregationResult.get(i);
+      // OBJECT (e.g. DistinctTable) calls toBytes() (e.g. 
DistinctTable.toBytes()) which takes care of replacing nulls
+      // with default values, and building presence vector and serializing 
both.
+      if (_isNullHandlingEnabled && columnDataTypes[i] != 
ColumnDataType.OBJECT) {

Review Comment:
   Should we skip the null bitmap update?
   Suggest moving the null handling check outside of the for loop



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java:
##########
@@ -50,6 +53,26 @@ public ProjectionBlockValSet(DataBlockCache dataBlockCache, 
String column, DataS
     _dataBlockCache = dataBlockCache;
     _column = column;
     _dataSource = dataSource;
+    NullValueVectorReader nullValueReader = _dataSource == null ? null : 
_dataSource.getNullValueVector();

Review Comment:
   Data source should never be `null`. The computation should be lazy, and the 
operator should decide whether to read it based on if null handling is enabled



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -54,6 +55,7 @@
 @SuppressWarnings("rawtypes")
 public class IntermediateResultsBlock implements Block {
   private DataSchema _dataSchema;
+  private boolean _isNullHandlingEnabled;

Review Comment:
   (minor) Rename to `_nullHandlingEnabled`, same for other places



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java:
##########
@@ -141,13 +143,21 @@ private Comparator<Object[]> getComparator() {
       multipliers[i] = _orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
     }
 
-    return (o1, o2) -> {
+    return (Object[] o1, Object[] o2) -> {
       for (int i = 0; i < numValuesToCompare; i++) {
         int index = valueIndices[i];
 
         // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
         Object v1 = o1[index];
         Object v2 = o2[index];
+        if (_isNullHandlingEnabled) {

Review Comment:
   Move this check outside



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -311,16 +337,51 @@ private DataTable getResultDataTable()
       throws IOException {
     DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
+    int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      dataTableBuilder.startRow();
-      int columnIndex = 0;
-      for (Object value : record.getValues()) {
-        setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
-        columnIndex++;
+    RoaringBitmap[] nullBitmaps = null;
+    if (_isNullHandlingEnabled) {
+      nullBitmaps = new RoaringBitmap[numColumns];
+      Object[] colDefaultNullValues = new Object[numColumns];
+      for (int colId = 0; colId < numColumns; colId++) {
+        if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
+          // Store a dummy value that is both a valid numeric, and a valid hex 
encoded value.
+          String specialVal = "30";
+          colDefaultNullValues[colId] = 
storedColumnDataTypes[colId].toDataType().convert(specialVal);
+        }
+        nullBitmaps[colId] = new RoaringBitmap();
+      }
+      int rowId = 0;
+      while (iterator.hasNext()) {
+        Object[] values = iterator.next().getValues();
+        dataTableBuilder.startRow();
+        for (int columnIndex = 0; columnIndex < values.length; columnIndex++) {
+          Object value = values[columnIndex];
+          if (value == null) {
+            value = colDefaultNullValues[columnIndex];
+            nullBitmaps[columnIndex].add(rowId);
+          }
+          setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
+        }
+        dataTableBuilder.finishRow();
+        rowId++;
+      }
+    } else {
+      while (iterator.hasNext()) {
+        Record record = iterator.next();
+        dataTableBuilder.startRow();
+        int columnIndex = 0;
+        for (Object value : record.getValues()) {
+          setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
+          columnIndex++;
+        }
+        dataTableBuilder.finishRow();
+      }
+    }
+    if (_isNullHandlingEnabled && DataTableFactory.getDataTableVersion() >= 
DataTableFactory.VERSION_4) {
+      for (int colId = 0; colId < numColumns; colId++) {

Review Comment:
   This part can be moved into the previous if check



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -390,28 +452,57 @@ private DataTable getAggregationResultDataTable()
       columnNames[i] = aggregationFunction.getColumnName();
       columnDataTypes[i] = 
aggregationFunction.getIntermediateResultColumnType();
     }
+    RoaringBitmap[] nullBitmaps = null;
+    Object[] colDefaultNullValues = null;
+    if (_isNullHandlingEnabled) {

Review Comment:
   Same comments as the previous method



##########
pinot-common/src/thrift/query.thrift:
##########
@@ -37,6 +37,7 @@ struct PinotQuery {
   11: optional map<string, string> queryOptions;
   12: optional bool explain;
   13: optional map<Expression, Expression> expressionOverrideHints;
+  14: optional bool nullHandlingEnabled;

Review Comment:
   Let's not add this as a separate thrift field. It can be added as a query 
option. See `QueryOptionsUtils` for more query option examples



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java:
##########
@@ -50,9 +50,7 @@ public class StreamingSelectionOnlyCombineOperator extends 
BaseCombineOperator {
   private static final String EXPLAIN_NAME = "SELECT_STREAMING_COMBINE";
 
   // Special IntermediateResultsBlock to indicate that this is the last 
results block for an operator
-  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =

Review Comment:
   We shouldn't need to change this



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java:
##########
@@ -84,9 +84,23 @@ public TableResizer(DataSchema dataSchema, QueryContext 
queryContext) {
       _orderByValueExtractors[i] = 
getOrderByValueExtractor(orderByExpression.getExpression());
       comparators[i] = orderByExpression.isAsc() ? Comparator.naturalOrder() : 
Comparator.reverseOrder();
     }
+    boolean isNullHandlingEnabled = queryContext.isNullHandlingEnabled();
     _intermediateRecordComparator = (o1, o2) -> {
       for (int i = 0; i < _numOrderByExpressions; i++) {
-        int result = comparators[i].compare(o1._values[i], o2._values[i]);
+        Object v1 = o1._values[i];
+        Object v2 = o2._values[i];
+        if (isNullHandlingEnabled) {

Review Comment:
   We can move this check outside to avoid overhead:
   ```
   if (queryContext.isNullHandlingEnabled()) {
     _intermediateRecordComparator = ...
   } else {
     _intermediateRecordComparator = ...
   }



##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java:
##########
@@ -49,27 +50,25 @@ public RowDataBlock(ByteBuffer byteBuffer)
     computeBlockObjectConstants();
   }
 
+  @Nullable
   @Override
   public RoaringBitmap getNullRowIds(int colId) {
     // _fixedSizeData stores two ints per col's null bitmap: offset, and 
length.
     int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
-    if (position >= _fixedSizeData.limit()) {
+    if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {

Review Comment:
   (minor) The first `null` check seems redundant, as we don't have this check 
in other methods



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -145,6 +167,10 @@ public void setDataSchema(DataSchema dataSchema) {
     _dataSchema = dataSchema;
   }
 
+  public boolean isNullHandlingEnabled() {

Review Comment:
   Is this for testing only? Ideally this info shouldn't be retrieved from the 
intermediate result, but from the query context



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java:
##########
@@ -95,6 +96,14 @@ public TransformResultMetadata 
getResultMetadata(ExpressionContext expression) {
     return _transformFunctionMap.get(expression).getResultMetadata();
   }
 
+  public NullValueVectorReader getNullValueVectorReader(ExpressionContext 
expression) {

Review Comment:
   This method should be in the projection layer instead of the transform 
layer. In the transform layer, we should be able to resolve the null values for 
the expression, instead of the column



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java:
##########
@@ -60,9 +60,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator 
extends BaseCombine
 
   // For min/max value based combine, when a thread detects that no more 
segments need to be processed, it inserts this
   // special IntermediateResultsBlock into the BlockingQueue to awake the main 
thread
-  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
-      new IntermediateResultsBlock(new DataSchema(new String[0], new 
DataSchema.ColumnDataType[0]),
-          Collections.emptyList());
+  private final IntermediateResultsBlock _lastResultsBlock;

Review Comment:
   This shouldn't need to be changed



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -311,16 +337,51 @@ private DataTable getResultDataTable()
       throws IOException {
     DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
+    int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      dataTableBuilder.startRow();
-      int columnIndex = 0;
-      for (Object value : record.getValues()) {
-        setDataTableColumn(storedColumnDataTypes[columnIndex], 
dataTableBuilder, columnIndex, value);
-        columnIndex++;
+    RoaringBitmap[] nullBitmaps = null;
+    if (_isNullHandlingEnabled) {
+      nullBitmaps = new RoaringBitmap[numColumns];
+      Object[] colDefaultNullValues = new Object[numColumns];
+      for (int colId = 0; colId < numColumns; colId++) {
+        if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
+          // Store a dummy value that is both a valid numeric, and a valid hex 
encoded value.
+          String specialVal = "30";
+          colDefaultNullValues[colId] = 
storedColumnDataTypes[colId].toDataType().convert(specialVal);

Review Comment:
   We can save the default value per type as constants instead of calculate on 
the fly. Using the default value for dimension should be good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to