rdblue commented on code in PR #11857:
URL: https://github.com/apache/iceberg/pull/11857#discussion_r1927947249


##########
core/src/main/java/org/apache/iceberg/variants/VariantBuilderBase.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.variants;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+abstract class VariantBuilderBase {
+  protected static final int MAX_SHORT_STR_SIZE = 0x3F;
+
+  private final ByteBufferWrapper buffer;
+  private final Dictionary dict;
+  private int startPos;
+
+  VariantBuilderBase(ByteBufferWrapper buffer, Dictionary dict) {
+    this.buffer = buffer;
+    this.dict = dict;
+    startPos = buffer.pos;
+  }
+
+  /**
+   * Builds the variant metadata from `dictionaryKeys` and returns the 
resulting Variant object.
+   *
+   * @return The constructed Variant object.
+   */
+  public Variant build() {
+    int numKeys = dict.size();
+
+    // Calculate total size of dictionary strings
+    long numStringBytes = dict.totalBytes();
+    if (numStringBytes > VariantConstants.SIZE_LIMIT) {
+      throw new VariantSizeLimitException();
+    }
+
+    // Determine the number of bytes required for dictionary size and offset 
entry
+    int offsetSize = sizeOf(Math.max((int) numStringBytes, numKeys));
+
+    // metadata: header byte, dictionary size, offsets and string bytes
+    long metadataSize = 1 + offsetSize + (numKeys + 1) * offsetSize + 
numStringBytes;
+
+    // Ensure the metadata size is within limits
+    if (metadataSize > VariantConstants.SIZE_LIMIT) {
+      throw new VariantSizeLimitException();
+    }
+
+    ByteBufferWrapper metadataBuffer =
+        new ByteBufferWrapper((int) metadataSize, (int) metadataSize);
+
+    // Write header byte (version + offset size)
+    
metadataBuffer.addByte(VariantUtil.metadataHeader(VariantConstants.VERSION, 
offsetSize));
+
+    // Write number of keys
+    metadataBuffer.writeLittleEndianUnsigned(numKeys, offsetSize);
+
+    // Write offsets
+    int currentOffset = 0;
+    for (byte[] key : dict.getKeys()) {
+      metadataBuffer.writeLittleEndianUnsigned(currentOffset, offsetSize);
+      currentOffset += key.length;
+    }
+    metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize);
+
+    // Write dictionary strings
+    dict.getKeys().forEach(metadataBuffer::addBytes);
+
+    return new VariantImpl(metadataBuffer.toByteArray(), buffer.toByteArray());
+  }
+
+  protected void writeNullInternal() {
+    buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_NULL));
+  }
+
+  protected void writeBooleanInternal(boolean value) {
+    buffer.addByte(
+        VariantUtil.primitiveHeader(
+            value ? Variants.Primitives.TYPE_TRUE : 
Variants.Primitives.TYPE_FALSE));
+  }
+
+  /**
+   * Writes a numeric value to the variant builder, automatically choosing the 
smallest type (INT8,
+   * INT16, INT32, or INT64) to store the value efficiently.
+   *
+   * @param value The numeric value to append.
+   */
+  protected void writeNumericInternal(long value) {
+    if (value == (byte) value) {
+      // INT8: Requires 1 byte for header + 1 byte for value
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT8));
+      buffer.writeLittleEndianUnsigned(value, 1);
+    } else if (value == (short) value) {
+      // INT16: Requires 1 byte for header + 2 bytes for value
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT16));
+      buffer.writeLittleEndianUnsigned(value, 2);
+    } else if (value == (int) value) {
+      // INT32: Requires 1 byte for header + 4 bytes for value
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT32));
+      buffer.writeLittleEndianUnsigned(value, 4);
+    } else {
+      // INT64: Requires 1 byte for header + 8 bytes for value
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT64));
+      buffer.writeLittleEndianUnsigned(value, 8);
+    }
+  }
+
+  protected void writeDoubleInternal(double value) {
+    
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DOUBLE));
+    buffer.writeLittleEndianUnsigned(Double.doubleToLongBits(value), 8);
+  }
+
+  /**
+   * Writes a decimal value to the variant builder, choosing the smallest 
decimal type (DECIMAL4,
+   * DECIMAL8, DECIMAL16) that fits its precision and scale.
+   */
+  public void writeDecimalInternal(BigDecimal value) {
+    Preconditions.checkArgument(
+        value.precision() <= VariantConstants.MAX_DECIMAL16_PRECISION,
+        "Unsupported Decimal precision: %s",
+        value.precision());
+
+    BigInteger unscaled = value.unscaledValue();
+    if (value.scale() <= VariantConstants.MAX_DECIMAL4_PRECISION
+        && value.precision() <= VariantConstants.MAX_DECIMAL4_PRECISION) {
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL4));
+      buffer.addByte((byte) value.scale());
+      buffer.writeLittleEndianUnsigned(unscaled.intValueExact(), 4);
+    } else if (value.scale() <= VariantConstants.MAX_DECIMAL8_PRECISION
+        && value.precision() <= VariantConstants.MAX_DECIMAL8_PRECISION) {
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL8));
+      buffer.addByte((byte) value.scale());
+      buffer.writeLittleEndianUnsigned(unscaled.longValueExact(), 8);
+    } else {
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL16));
+      buffer.addByte((byte) value.scale());
+      byte[] bytes = unscaled.toByteArray();
+      for (int i = 0; i < 16; i++) {
+        byte byteValue =
+            i < bytes.length ? bytes[bytes.length - 1 - i] : (byte) (bytes[0] 
< 0 ? -1 : 0);
+        buffer.addByte(byteValue);
+      }
+    }
+  }
+
+  protected void writeDateInternal(int daysSinceEpoch) {
+    buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DATE));
+    buffer.writeLittleEndianUnsigned(daysSinceEpoch, 4);
+  }
+
+  /** Writes a timestamp with timezone (microseconds since epoch) to the 
variant builder. */
+  protected void writeTimestampTzInternal(long microsSinceEpoch) {
+    
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPTZ));
+    buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8);
+  }
+
+  /** Writes a timestamp without timezone (microseconds since epoch) to the 
variant builder. */
+  protected void writeTimestampNtzInternal(long microsSinceEpoch) {
+    
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPNTZ));
+    buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8);
+  }
+
+  protected void writeFloatInternal(float value) throws 
VariantSizeLimitException {
+    
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_FLOAT));
+    buffer.writeLittleEndianUnsigned(Float.floatToIntBits(value), 4);
+  }
+
+  protected void writeBinaryInternal(byte[] value) throws 
VariantSizeLimitException {
+    
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_BINARY));
+    buffer.writeLittleEndianUnsigned(value.length, 4);
+    buffer.addBytes(value);
+  }
+
+  protected void writeStringInternal(String value) {
+    byte[] text = value.getBytes(StandardCharsets.UTF_8);
+    boolean longStr = text.length > MAX_SHORT_STR_SIZE;
+
+    // Write header
+    if (longStr) {
+      
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_STRING));
+      buffer.writeLittleEndianUnsigned(text.length, 4);
+    } else {
+      buffer.addByte(VariantUtil.shortStrHeader(text.length));
+    }
+
+    // Write string content
+    buffer.addBytes(text);
+  }
+
+  /** Choose the smallest number of bytes to store the given value. */
+  protected static int sizeOf(int maxValue) {
+    if (maxValue <= 0xFF) {
+      return 1;
+    } else if (maxValue <= 0xFFFF) {
+      return 2;
+    } else if (maxValue <= 0xFFFFFF) {
+      return 3;
+    }
+
+    return 4;
+  }
+
+  /**
+   * Completes writing an object to the buffer. Object fields are already 
written, and this method
+   * inserts header including header byte, number of elements, field IDs, and 
field offsets.
+   *
+   * @param objStartPos The starting position of the object data in the buffer.
+   * @param fields The list of field entries (key, ID, offset).
+   */
+  protected void endObject(int objStartPos, List<FieldEntry> fields) {
+    int numElements = fields.size();
+
+    // Sort fields by key and ensure no duplicate keys
+    Collections.sort(fields);
+    int maxId = numElements == 0 ? 0 : fields.get(0).id;
+    for (int i = 1; i < numElements; i++) {
+      maxId = Math.max(maxId, fields.get(i).id);
+      if (fields.get(i).key.equals(fields.get(i - 1).key)) {
+        throw new IllegalStateException("Duplicate key in Variant: " + 
fields.get(i).key);
+      }
+    }
+
+    int dataSize = buffer.pos - objStartPos; // Total byte size of the object 
values
+    boolean isLarge = numElements > 0xFF; // Determine whether to use large 
format
+    int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the object size
+    int fieldIdSize = sizeOf(maxId); // Number of bytes for each field id
+    int fieldOffsetSize = sizeOf(dataSize); // Number of bytes for each field 
offset
+    int headerSize =
+        1 + sizeBytes + numElements * fieldIdSize + (numElements + 1) * 
fieldOffsetSize;
+
+    // Shift existing data to make room for header
+    buffer.shift(objStartPos, headerSize);
+
+    buffer.insertByte(
+        VariantUtil.objectHeader(isLarge, fieldIdSize, fieldOffsetSize),
+        objStartPos); // Insert header byte
+    buffer.insertLittleEndianUnsigned(
+        numElements, sizeBytes, objStartPos + 1); // Insert number of elements
+
+    // Insert field IDs and offsets
+    int fieldIdStart = objStartPos + 1 + sizeBytes;
+    int fieldOffsetStart = fieldIdStart + numElements * fieldIdSize;
+    for (int i = 0; i < numElements; i++) {
+      buffer.insertLittleEndianUnsigned(
+          fields.get(i).id, fieldIdSize, fieldIdStart + i * fieldIdSize);
+      buffer.insertLittleEndianUnsigned(
+          fields.get(i).offset, fieldOffsetSize, fieldOffsetStart + i * 
fieldOffsetSize);
+    }
+
+    // Insert the offset to the end of the data
+    buffer.insertLittleEndianUnsigned(
+        dataSize, fieldOffsetSize, fieldOffsetStart + numElements * 
fieldOffsetSize);
+  }
+
+  /**
+   * Completes writing an array to the buffer. Array values are already 
written, and this method
+   * inserts header including the header byte, number of elements, and field 
offsets.
+   *
+   * @param arrStartPos The starting position of the array values in the 
buffer.
+   * @param offsets The offsets for each array value.
+   */
+  protected void endArray(int arrStartPos, List<Integer> offsets) {
+    int dataSize = buffer.pos - arrStartPos; // Total byte size of the array 
values
+    int numElements = offsets.size();
+
+    boolean isLarge = numElements > 0xFF; // Determine whether to use large 
format
+    int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the array size
+    int fieldOffsetSize = sizeOf(dataSize); // Number of bytes of each field 
offset
+    int headerSize = 1 + sizeBytes + (numElements + 1) * fieldOffsetSize; // 
header size
+    int offsetStart = arrStartPos + 1 + sizeBytes; // Start position for 
offsets
+
+    // Shift existing data to make room for header
+    buffer.shift(arrStartPos, headerSize);
+
+    buffer.insertByte(
+        VariantUtil.arrayHeader(isLarge, fieldOffsetSize), arrStartPos); // 
Insert header byte
+    buffer.insertLittleEndianUnsigned(
+        numElements, sizeBytes, arrStartPos + 1); // Insert number of elements
+
+    // Insert field offsets
+    for (int i = 0; i < numElements; i++) {
+      buffer.insertLittleEndianUnsigned(
+          offsets.get(i), fieldOffsetSize, offsetStart + i * fieldOffsetSize);
+    }
+
+    // Insert the offset to the end of the data
+    buffer.insertLittleEndianUnsigned(
+        dataSize, fieldOffsetSize, offsetStart + numElements * 
fieldOffsetSize);
+  }
+
+  protected ByteBufferWrapper getBuffer() {

Review Comment:
   Style: Iceberg does not use `get` in accessor names and discourages its use 
in general. If there's a more helpful verb then it is generally better to use 
it. Otherwise we usually omit `get`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to