XuQianJin-Stars opened a new pull request, #2875:
URL: https://github.com/apache/fluss/pull/2875

   <!--
   *Thank you very much for contributing to Fluss - we are happy that you want 
to help us improve Fluss. To help the community review your contribution in the 
best possible way, please go through the checklist below, which will get the 
contribution into a shape in which it can be best reviewed.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GitHub 
issue](https://github.com/apache/fluss/issues). Exceptions are made for typos 
in JavaDoc or documentation files, which need no issue.
   
     - Name the pull request in the format "[component] Title of the pull 
request", where *[component]* should be replaced by the name of the component 
being changed. Typically, this corresponds to the component label assigned to 
the issue (e.g., [kv], [log], [client], [flink]). Skip *[component]* if you are 
unsure about which is the best component.
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
   
     - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes.
   
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
   
   
   **(The sections below can be removed for hotfixes or typos)**
   -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #2873
   
   <!-- What is the purpose of the change -->
   Introduce a first-class `Variant` data type throughout Fluss's row 
infrastructure, replacing the previous opaque `byte[]` representation. This 
aligns Fluss with the [Variant Binary Encoding 
spec](https://github.com/apache/parquet-format/blob/master/VariantEncoding.md) 
and follows the same design pattern as Apache Paimon's Variant implementation, 
providing structured `value()` and `metadata()` access for semi-structured data.
   
   
   ### Brief change log
   
   <!-- Please describe the changes made in this pull request and explain how 
they address the issue -->
   #### 1. Core Types (`fluss-common`)
   
   - **New `Variant` interface** (`fluss-common/.../row/Variant.java`): Defines 
`value()`, `metadata()`, `sizeInBytes()`, `copy()` accessors, plus static 
helpers `bytesToVariant(byte[])` / `variantToBytes(Variant)` for 
backward-compatible wire format conversion.
   - **New `GenericVariant` class** 
(`fluss-common/.../row/GenericVariant.java`): Default implementation storing 
two `byte[]` fields (value + metadata) with proper `equals()`, `hashCode()`, 
`toString()`.
   - **New `VariantType`** (`fluss-common/.../types/VariantType.java`): Adds 
`VARIANT` to Fluss's type system with `DataTypeRoot.VARIANT` enum entry, parser 
support, JSON serde, and visitor pattern integration.
   
   #### 2. Row Infrastructure (`fluss-common`)
   
   - **`DataGetters`**: Added `Variant getVariant(int pos)` abstract method.
   - **All `InternalRow` implementations**: `GenericRow`, `BinaryRow` (via 
`AlignedRow`), `CompactedRow`, `IndexedRow`, `ProjectedRow`, `PaddingRow`, 
`ColumnarRow` — all implement `getVariant()`.
   - **All `InternalArray` implementations**: `GenericArray`, `BinaryArray`, 
`ColumnarArray` — all implement `getVariant()`.
   - **Binary writers**: `BinaryWriter`, `AbstractBinaryWriter`, 
`BinaryArrayWriter`, `SequentialBinaryWriter` — added `writeVariant(int pos, 
Variant)`.
   - **Compacted/Indexed readers & writers**: `CompactedRowReader/Writer`, 
`IndexedRowReader/Writer` — added `readVariant()` / `writeVariant()`.
   - **Arrow integration**: New `ArrowVariantColumnVector` (reader) and 
`ArrowVariantWriter` (writer) for Arrow-based columnar storage; `ArrowUtils` 
updated.
   
   #### 3. Client Converters (`fluss-client`)
   
   - `PojoToRowConverter`: Added VARIANT handling, supporting both `byte[]` and 
`Variant` inputs.
   - `RowToPojoConverter`: Added VARIANT → `byte[]` conversion.
   
   #### 4. Flink Bridge (`fluss-flink`)
   
   - `FlussTypeToFlinkType`: Maps `VARIANT` → Flink `BYTES` (with TODO for 
future native VariantType support).
   - `FlussRowToFlinkRowConverter`: Converts `Variant` → `byte[]` via 
`variantToBytes()`.
   - `FlussRowToJsonConverters`: Serializes `Variant` as binary JSON node.
   - `FlinkAsFlussRow` / `FlinkAsFlussArray`: Implements `getVariant()` by 
deserializing from Flink's `byte[]`.
   - `PojoToRowConverter` (Flink): Added VARIANT handling.
   - `RowDataSerializationSchema`: Added VARIANT case.
   
   #### 5. Spark Bridge (`fluss-spark`)
   
   - `FlussToSparkTypeVisitor`: Maps `VARIANT` → Spark `BinaryType` (with TODO 
for Spark 4 native VariantType via shim mechanism).
   - `SparkAsFlussRow` / `SparkAsFlussArray`: Implements `getVariant()` by 
deserializing from Spark's `byte[]`.
   
   #### 6. Lake Connectors (`fluss-lake`)
   
   - **Paimon**: `FlussDataTypeToPaimonDataType` maps VARIANT; 
`PaimonRowAsFlussRow` / `PaimonArrayAsFlussArray` implement `getVariant()` 
(with TODO for native Paimon Variant once dependency is upgraded).
   - **Iceberg**: `FlussDataTypeToIcebergDataType` maps VARIANT; 
`IcebergRecordAsFlussRow` / `IcebergArrayAsFlussArray` implement `getVariant()`.
   - **Lance**: `LanceArrowUtils`, `ArrowDataConverter`, 
`ShadedArrowBatchWriter` updated with VARIANT support.
   
   #### 7. Utilities
   
   - `InternalRowUtils`, `TypeUtils`, `PartitionUtils` — added VARIANT handling.
   - `DataTypeJsonSerde` — VARIANT serialization/deserialization.
   
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   - `DataTypeVisitorTest`: Updated to cover `VariantType` in the type visitor 
test.
   - `InternalArrayAssert`: Added `getVariant()` assertion support for test 
utilities.
   - Existing tests pass with `mvn compile -DskipTests` across all modules 
(fluss-common, fluss-flink, fluss-spark, fluss-lake-paimon, fluss-lake-iceberg, 
fluss-lake-lance).
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   - **New public API**: `Variant` interface and `GenericVariant` class in 
`fluss-common`.
   - **New data type**: `VARIANT` added to `DataTypeRoot` and `DataTypes`.
   - **Storage format**: No change. The on-wire binary format remains `[4-byte 
value length (big-endian)][value bytes][metadata bytes]`. The 
`Variant.variantToBytes()` / `Variant.bytesToVariant()` methods handle the 
conversion, ensuring backward compatibility.
   - **Engine mappings**: VARIANT is mapped to `BYTES` / `BinaryType` in Flink 
and Spark respectively (degraded mapping due to lack of native engine support), 
and to the corresponding binary types in lake formats.
   
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   - `website/docs/_configs/_partial_config.mdx`: Auto-generated config 
documentation updated (unrelated config changes included in the regeneration).
   - No new user-facing documentation page is needed at this stage. VARIANT 
type documentation will be added in a follow-up when end-to-end SQL support is 
available.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to