@@ -0,0 +1,504 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.variants;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+abstract class VariantBuilderBase {
+  protected static final int MAX_SHORT_STR_SIZE = 0x3F;
+  private final ByteBufferWrapper buffer;
+  private final Dictionary dict;
+  private int startPos;
+  VariantBuilderBase(ByteBufferWrapper buffer, Dictionary dict) {
+    this.buffer = buffer;
+    this.dict = dict;
+    startPos = buffer.pos;
+  }
+  /**
+   * Builds the variant metadata from `dictionaryKeys` and returns the 
resulting Variant object.
+   *
+   * @return The constructed Variant object.
+   */
+  public Variant build() {
+    int numKeys = dict.size();
+    // Calculate total size of dictionary strings
+    long numStringBytes = dict.totalBytes();
+    if (numStringBytes > VariantConstants.SIZE_LIMIT) {
+      throw new VariantSizeLimitException();
+    }
+    // Determine the number of bytes required for dictionary size and offset 
+    int offsetSize = sizeOf(Math.max((int) numStringBytes, numKeys));
+    // metadata: header byte, dictionary size, offsets and string bytes
+    long metadataSize = 1 + offsetSize + (numKeys + 1) * offsetSize + 
+    // Ensure the metadata size is within limits
+    if (metadataSize > VariantConstants.SIZE_LIMIT) {
+      throw new VariantSizeLimitException();
+    }
+    ByteBufferWrapper metadataBuffer =
+        new ByteBufferWrapper((int) metadataSize, (int) metadataSize);
+    // Write header byte (version + offset size)
+    // Write number of keys
+    metadataBuffer.writeLittleEndianUnsigned(numKeys, offsetSize);
+    // Write offsets
+    int currentOffset = 0;
+    for (byte[] key : dict.getKeys()) {
+      metadataBuffer.writeLittleEndianUnsigned(currentOffset, offsetSize);
+      currentOffset += key.length;
+    }
+    metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize);
+    // Write dictionary strings
+    dict.getKeys().forEach(metadataBuffer::addBytes);
+    return new VariantImpl(metadataBuffer.toByteArray(), buffer.toByteArray());
+  }
+  protected void writeNullInternal() {
+    buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_NULL));
+  }
+  protected void writeBooleanInternal(boolean value) {
+    buffer.addByte(
+        VariantUtil.primitiveHeader(
+            value ? Variants.Primitives.TYPE_TRUE : 
+  }
+  /**
+   * Writes a numeric value to the variant builder, automatically choosing the 
smallest type (INT8,
+   * INT16, INT32, or INT64) to store the value efficiently.
+   *
+   * @param value The numeric value to append.
+   */
+  protected void writeNumericInternal(long value) {
+    if (value == (byte) value) {
+      // INT8: Requires 1 byte for header + 1 byte for value
+      buffer.writeLittleEndianUnsigned(value, 1);
+    } else if (value == (short) value) {
+      // INT16: Requires 1 byte for header + 2 bytes for value
+      buffer.writeLittleEndianUnsigned(value, 2);
+    } else if (value == (int) value) {
+      // INT32: Requires 1 byte for header + 4 bytes for value
+      buffer.writeLittleEndianUnsigned(value, 4);
+    } else {
+      // INT64: Requires 1 byte for header + 8 bytes for value
+      buffer.writeLittleEndianUnsigned(value, 8);
+    }
+  }
+  protected void writeDoubleInternal(double value) {
+    buffer.writeLittleEndianUnsigned(Double.doubleToLongBits(value), 8);
+  }
+  /**
+   * Writes a decimal value to the variant builder, choosing the smallest 
decimal type (DECIMAL4,
+   * DECIMAL8, DECIMAL16) that fits its precision and scale.
+   */
+  public void writeDecimalInternal(BigDecimal value) {
+    Preconditions.checkArgument(
+        value.precision() <= VariantConstants.MAX_DECIMAL16_PRECISION,
+        "Unsupported Decimal precision: %s",
+        value.precision());
+    BigInteger unscaled = value.unscaledValue();
+    if (value.scale() <= VariantConstants.MAX_DECIMAL4_PRECISION
+        && value.precision() <= VariantConstants.MAX_DECIMAL4_PRECISION) {
+      buffer.addByte((byte) value.scale());
+      buffer.writeLittleEndianUnsigned(unscaled.intValueExact(), 4);
+    } else if (value.scale() <= VariantConstants.MAX_DECIMAL8_PRECISION
+        && value.precision() <= VariantConstants.MAX_DECIMAL8_PRECISION) {
+      buffer.addByte((byte) value.scale());
+      buffer.writeLittleEndianUnsigned(unscaled.longValueExact(), 8);
+    } else {
+      buffer.addByte((byte) value.scale());
+      byte[] bytes = unscaled.toByteArray();
+      for (int i = 0; i < 16; i++) {
+        byte byteValue =
+            i < bytes.length ? bytes[bytes.length - 1 - i] : (byte) (bytes[0] 
< 0 ? -1 : 0);
+        buffer.addByte(byteValue);
+      }
+    }
+  }
+  protected void writeDateInternal(int daysSinceEpoch) {
+    buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DATE));
+    buffer.writeLittleEndianUnsigned(daysSinceEpoch, 4);
+  }
+  /** Writes a timestamp with timezone (microseconds since epoch) to the 
variant builder. */
+  protected void writeTimestampTzInternal(long microsSinceEpoch) {
+    buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8);
+  }
+  /** Writes a timestamp without timezone (microseconds since epoch) to the 
variant builder. */
+  protected void writeTimestampNtzInternal(long microsSinceEpoch) {
+    buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8);
+  }
+  protected void writeFloatInternal(float value) throws 
VariantSizeLimitException {
+    buffer.writeLittleEndianUnsigned(Float.floatToIntBits(value), 4);
+  }
+  protected void writeBinaryInternal(byte[] value) throws 
VariantSizeLimitException {
+    buffer.writeLittleEndianUnsigned(value.length, 4);
+    buffer.addBytes(value);
+  }
+  protected void writeStringInternal(String value) {
+    byte[] text = value.getBytes(StandardCharsets.UTF_8);
+    boolean longStr = text.length > MAX_SHORT_STR_SIZE;
+    // Write header
+    if (longStr) {
+      buffer.writeLittleEndianUnsigned(text.length, 4);
+    } else {
+      buffer.addByte(VariantUtil.shortStrHeader(text.length));
+    }
+    // Write string content
+    buffer.addBytes(text);
+  }
+  /** Choose the smallest number of bytes to store the given value. */
+  protected static int sizeOf(int maxValue) {
+    if (maxValue <= 0xFF) {
+      return 1;
+    } else if (maxValue <= 0xFFFF) {
+      return 2;
+    } else if (maxValue <= 0xFFFFFF) {
+      return 3;
+    }
+    return 4;
+  }
+  /**
+   * Completes writing an object to the buffer. Object fields are already 
written, and this method
+   * inserts header including header byte, number of elements, field IDs, and 
field offsets.
+   *
+   * @param objStartPos The starting position of the object data in the buffer.
+   * @param fields The list of field entries (key, ID, offset).
+   */
+  protected void endObject(int objStartPos, List<FieldEntry> fields) {
+    int numElements = fields.size();
+    // Sort fields by key and ensure no duplicate keys
+    Collections.sort(fields);
+    int maxId = numElements == 0 ? 0 : fields.get(0).id;
+    for (int i = 1; i < numElements; i++) {
+      maxId = Math.max(maxId, fields.get(i).id);
+      if (fields.get(i).key.equals(fields.get(i - 1).key)) {
+        throw new IllegalStateException("Duplicate key in Variant: " + 
+      }
+    }
+    int dataSize = buffer.pos - objStartPos; // Total byte size of the object 
+    boolean isLarge = numElements > 0xFF; // Determine whether to use large 
+    int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the object size
+    int fieldIdSize = sizeOf(maxId); // Number of bytes for each field id
+    int fieldOffsetSize = sizeOf(dataSize); // Number of bytes for each field 
+    int headerSize =
+        1 + sizeBytes + numElements * fieldIdSize + (numElements + 1) * 
+    // Shift existing data to make room for header
+    buffer.shift(objStartPos, headerSize);
+    buffer.insertByte(
+        VariantUtil.objectHeader(isLarge, fieldIdSize, fieldOffsetSize),
+        objStartPos); // Insert header byte
+    buffer.insertLittleEndianUnsigned(
+        numElements, sizeBytes, objStartPos + 1); // Insert number of elements
+    // Insert field IDs and offsets
+    int fieldIdStart = objStartPos + 1 + sizeBytes;
+    int fieldOffsetStart = fieldIdStart + numElements * fieldIdSize;
+    for (int i = 0; i < numElements; i++) {
+      buffer.insertLittleEndianUnsigned(
+          fields.get(i).id, fieldIdSize, fieldIdStart + i * fieldIdSize);
+      buffer.insertLittleEndianUnsigned(
+          fields.get(i).offset, fieldOffsetSize, fieldOffsetStart + i * 
+    }
+    // Insert the offset to the end of the data
+    buffer.insertLittleEndianUnsigned(
+        dataSize, fieldOffsetSize, fieldOffsetStart + numElements * 
+  }
+  /**
+   * Completes writing an array to the buffer. Array values are already 
written, and this method
+   * inserts header including the header byte, number of elements, and field 
+   *
+   * @param arrStartPos The starting position of the array values in the 
+   * @param offsets The offsets for each array value.
+   */
+  protected void endArray(int arrStartPos, List<Integer> offsets) {
+    int dataSize = buffer.pos - arrStartPos; // Total byte size of the array 
+    int numElements = offsets.size();
+    boolean isLarge = numElements > 0xFF; // Determine whether to use large 
+    int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the array size
+    int fieldOffsetSize = sizeOf(dataSize); // Number of bytes of each field 
+    int headerSize = 1 + sizeBytes + (numElements + 1) * fieldOffsetSize; // 
header size
+    int offsetStart = arrStartPos + 1 + sizeBytes; // Start position for 
+    // Shift existing data to make room for header
+    buffer.shift(arrStartPos, headerSize);
+    buffer.insertByte(
+        VariantUtil.arrayHeader(isLarge, fieldOffsetSize), arrStartPos); // 
Insert header byte
+    buffer.insertLittleEndianUnsigned(
+        numElements, sizeBytes, arrStartPos + 1); // Insert number of elements
+    // Insert field offsets
+    for (int i = 0; i < numElements; i++) {
+      buffer.insertLittleEndianUnsigned(
+          offsets.get(i), fieldOffsetSize, offsetStart + i * fieldOffsetSize);
+    }
+    // Insert the offset to the end of the data
+    buffer.insertLittleEndianUnsigned(
+        dataSize, fieldOffsetSize, offsetStart + numElements * 
+  }
+  protected ByteBufferWrapper getBuffer() {

Review Comment:
   Style: Iceberg does not use `get` in accessor names and discourages its use 
in general. If there's a more helpful verb then it is generally better to use 
it. Otherwise we usually omit `get`.

