gortiz commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1738697999


##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -187,200 +147,321 @@ public static RowDataBlock buildFromRows(List<Object[]> 
rows, DataSchema dataSch
                     dataSchema.getColumnName(colId)));
         }
       }
-      rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
+
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true)
+        .addPagedOutputStream(varSize);
+
     // Write null bitmaps after writing data.
-    for (RoaringBitmap nullBitmap : nullBitmaps) {
-      rowBuilder.setNullRowIds(nullBitmap);
-    }
-    return buildRowBlock(rowBuilder);
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildRowBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
   }
 
+
   public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema)
       throws IOException {
+    return buildFromColumns(columns, dataSchema, 
PagedPinotOutputStream.HeapPageAllocator.createSmall());
+  }
+
+  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema,
+      PagedPinotOutputStream.PageAllocator allocator)
+      throws IOException {
     int numRows = columns.isEmpty() ? 0 : columns.get(0).length;
-    DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, 
DataBlock.Type.COLUMNAR, numRows);
+
+    int fixedBytesPerRow = calculateBytesPerRow(dataSchema);
+    int nullFixedBytes = dataSchema.size() * Integer.BYTES * 2;
+    int fixedBytesRequired = fixedBytesPerRow * numRows + nullFixedBytes;
+
+    Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
+
     // TODO: consolidate these null utils into data table utils.
     // Selection / Agg / Distinct all have similar code.
-    ColumnDataType[] storedTypes = dataSchema.getStoredColumnDataTypes();
-    int numColumns = storedTypes.length;
+    int numColumns = dataSchema.size();
+
     RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
-    Object[] nullPlaceholders = new Object[numColumns];
-    for (int colId = 0; colId < numColumns; colId++) {
-      nullBitmaps[colId] = new RoaringBitmap();
-      nullPlaceholders[colId] = storedTypes[colId].getNullPlaceholder();
+    ByteBuffer fixedSize = ByteBuffer.allocate(fixedBytesRequired);
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true);
+
+    try (PagedPinotOutputStream varSize = new 
PagedPinotOutputStream(allocator)) {
+      for (int colId = 0; colId < numColumns; colId++) {
+        RoaringBitmap nullBitmap = new RoaringBitmap();
+        nullBitmaps[colId] = nullBitmap;
+        serializeColumnData(columns, dataSchema, colId, fixedSize, varSize, 
nullBitmap, dictionary);
+      }
+      varBufferBuilder.addPagedOutputStream(varSize);
     }
-    for (int colId = 0; colId < numColumns; colId++) {
-      Object[] column = columns.get(colId);
-      ByteBuffer byteBuffer = ByteBuffer.allocate(numRows * 
columnarBuilder._columnSizeInBytes[colId]);
-      Object value;
-
-      // NOTE:
-      // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
-      // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
-      switch (storedTypes[colId]) {
-        // Single-value column
-        case INT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putInt((int) value);
+    // Write null bitmaps after writing data.
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildColumnarBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
+  }
+
+  private static void serializeColumnData(List<Object[]> columns, DataSchema 
dataSchema, int colId,
+      ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap 
nullBitmap,
+      Object2IntOpenHashMap<String> dictionary)
+      throws IOException {
+    ColumnDataType storedType = 
dataSchema.getColumnDataType(colId).getStoredType();
+    int numRows = columns.get(colId).length;
+
+    Object[] column = columns.get(colId);
+
+    // NOTE:
+    // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
+    // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
+    switch (storedType) {
+      // Single-value column
+      case INT: {
+        int nullPlaceholder = (int) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceholder);
+          } else {
+            fixedSize.putInt((int) value);
           }
-          break;
-        case LONG:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putLong((long) value);
+        }
+        break;
+      }
+      case LONG: {
+        long nullPlaceholder = (long) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putLong(nullPlaceholder);
+          } else {
+            fixedSize.putLong((long) value);
           }
-          break;
-        case FLOAT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putFloat((float) value);
+        }
+        break;
+      }
+      case FLOAT: {
+        float nullPlaceholder = (float) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putFloat(nullPlaceholder);
+          } else {
+            fixedSize.putFloat((float) value);
           }
-          break;
-        case DOUBLE:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putDouble((double) value);
+        }
+        break;
+      }
+      case DOUBLE: {
+        double nullPlaceholder = (double) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putDouble(nullPlaceholder);
+          } else {
+            fixedSize.putDouble((double) value);
           }
-          break;
-        case BIG_DECIMAL:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        BigDecimal nullPlaceholder = (BigDecimal) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (BigDecimal) value);
           }
-          break;
-        case STRING:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String) value);
+        }
+        break;
+      }
+      case STRING: {
+        ToIntFunction<String> didSupplier = k -> dictionary.size();
+        int nullPlaceHolder = dictionary.computeIfAbsent((String) 
storedType.getNullPlaceholder(), didSupplier);
+
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceHolder);
+          } else {
+            int dictId = dictionary.computeIfAbsent((String) value, 
didSupplier);
+            fixedSize.putInt(dictId);
           }
-          break;
-        case BYTES:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+        }
+        break;
+      }
+      case BYTES: {
+        ByteArray nullPlaceholder = (ByteArray) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (ByteArray) value);
           }
-          break;
+        }
+        break;
+      }
 
-        // Multi-value column
-        case INT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (int[]) value);
+      // Multi-value column
+      case INT_ARRAY: {
+        int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (int[]) value);
           }
-          break;
-        case LONG_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (long[]) value);
+        }
+        break;
+      }
+      case LONG_ARRAY: {
+        long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (long[]) value);
           }
-          break;
-        case FLOAT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (float[]) value);
+        }
+        break;
+      }
+      case FLOAT_ARRAY: {
+        float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (float[]) value);
           }
-          break;
-        case DOUBLE_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (double[]) value);
+        }
+        break;
+      }
+      case DOUBLE_ARRAY: {
+        double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (double[]) value);
           }
-          break;
-        case STRING_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String[]) value);
+        }
+        break;
+      }
+      case STRING_ARRAY: {
+        String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+          } else {
+            setColumn(fixedSize, varSize, (String[]) value, dictionary);
           }
-          break;
+        }
+        break;
+      }
 
-        // Special intermediate result for aggregation function
-        case OBJECT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, column[rowId]);
-          }
-          break;
+      // Special intermediate result for aggregation function
+      case OBJECT: {
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, column[rowId]);
+        }
+        break;
+      }
+      // Null
+      case UNKNOWN:
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, (Object) null);
+        }
+        break;
 
-        // Null
-        case UNKNOWN:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, (Object) null);
-          }
-          break;
+      default:
+        throw new IllegalStateException(
+            String.format("Unsupported stored type: %s for column: %s", 
storedType,
+                dataSchema.getColumnName(colId)));
+    }
+  }
 
+  private static int calculateBytesPerRow(DataSchema dataSchema) {
+    int rowSizeInBytes = 0;
+    for (ColumnDataType columnDataType : dataSchema.getColumnDataTypes()) {
+      switch (columnDataType) {
+        case INT:
+          rowSizeInBytes += 4;
+          break;
+        case LONG:
+          rowSizeInBytes += 8;
+          break;
+        case FLOAT:
+          rowSizeInBytes += 4;
+          break;
+        case DOUBLE:
+          rowSizeInBytes += 8;
+          break;
+        case STRING:
+          rowSizeInBytes += 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
         default:
-          throw new IllegalStateException(
-              String.format("Unsupported stored type: %s for column: %s", 
storedTypes[colId],
-                  dataSchema.getColumnName(colId)));
+          rowSizeInBytes += 8;
+          break;
       }
-      
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
-    // Write null bitmaps after writing data.
-    for (RoaringBitmap nullBitmap : nullBitmaps) {
-      columnarBuilder.setNullRowIds(nullBitmap);
+    return rowSizeInBytes;
+  }
+
+  private static void writeVarOffsetInFixed(ByteBuffer fixedSize, 
PagedPinotOutputStream varSize) {
+    long offsetInVar = varSize.getCurrentOffset();
+    Preconditions.checkState(offsetInVar <= Integer.MAX_VALUE,
+        "Cannot handle variable size output stream larger than 2GB");
+    fixedSize.putInt((int) offsetInVar);
+  }
+
+  private static void setNullRowIds(RoaringBitmap[] nullVectors, ByteBuffer 
fixedSize,
+      CompoundDataBuffer.Builder varBufferBuilder)
+      throws IOException {
+    int varBufSize = Arrays.stream(nullVectors)
+        .mapToInt(bitmap -> bitmap == null ? 0 : 
bitmap.serializedSizeInBytes())
+        .sum();
+    ByteBuffer variableSize = ByteBuffer.allocate(varBufSize)
+        .order(ByteOrder.BIG_ENDIAN);
+
+    long varWrittenBytes = varBufferBuilder.getWrittenBytes();
+    Preconditions.checkArgument(varWrittenBytes < Integer.MAX_VALUE,
+        "Cannot handle variable size output stream larger than 2GB but found 
{} written bytes", varWrittenBytes);
+    int startVariableOffset = (int) varWrittenBytes;
+    for (RoaringBitmap nullRowIds : nullVectors) {
+      int writtenVarBytes = variableSize.position();
+      fixedSize.putInt(startVariableOffset + writtenVarBytes);
+      if (nullRowIds == null || nullRowIds.isEmpty()) {
+        fixedSize.putInt(0);
+      } else {
+        RoaringBitmapUtils.serialize(nullRowIds, variableSize);
+        fixedSize.putInt(variableSize.position() - writtenVarBytes);
+      }
     }
-    return buildColumnarBlock(columnarBuilder);
+    varBufferBuilder.addBuffer(variableSize);
   }
 
-  private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
-    return new RowDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
-        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
-        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  private static RowDataBlock buildRowBlock(int numRows, DataSchema 
dataSchema, String[] dictionary,
+      ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+    return new RowDataBlock(numRows, dataSchema, dictionary, 
PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
   }
 
-  private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder 
builder) {
-    return new ColumnarDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
-        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
-        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  private static ColumnarDataBlock buildColumnarBlock(int numRows, DataSchema 
dataSchema, String[] dictionary,
+      ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+    return new ColumnarDataBlock(numRows, dataSchema, dictionary,
+        PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());

Review Comment:
   What do you mean? Just to write it like
   
   ```
   return new ColumnarDataBlock(numRows, dataSchema, dictionary, 
PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
   ```
   
   In that case we cannot do that because that line is 126 chars long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to