polyzos commented on code in PR #2814:
URL: https://github.com/apache/fluss/pull/2814#discussion_r2905166155
##########
fluss-client/src/test/java/org/apache/fluss/client/converter/RowToPojoConverterTest.java:
##########
@@ -41,7 +44,7 @@ public void testRoundTripFullSchema() {
ConvertersTestFixtures.TestPojo pojo =
ConvertersTestFixtures.TestPojo.sample();
GenericRow row = writer.toRow(pojo);
- assertThat(row.getFieldCount()).isEqualTo(15);
+ assertThat(row.getFieldCount()).isEqualTo(17);
Review Comment:
why changes this?
##########
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoArrayToFlussArray.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.MapType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+
+/** Adapter class for converting Pojo Array to Fluss InternalArray. */
+public class PojoArrayToFlussArray {
+ private final Object obj;
+ private final DataType fieldType;
+ private final String fieldName;
+
+ public PojoArrayToFlussArray(Object obj, DataType fieldType, String
fieldName) {
+ this.obj = obj;
+ this.fieldType = fieldType;
+ this.fieldName = fieldName;
+ }
+
+ public GenericArray convertArray() {
+ if (obj == null) {
+ return null;
+ }
+
+ ArrayType arrayType = (ArrayType) fieldType;
+ DataType elementType = arrayType.getElementType();
+
+ // Handle primitive arrays directly
+ if (obj instanceof Boolean[]) {
Review Comment:
The fast-path only checks boxed types (Boolean[], Long[], etc.). A field
declared as int[], long[], boolean[], etc. will not match any instanceof check
and throw an IllegalArgumentException at runtime with a misleading message. Add
explicit handling for each primitive array type (e.g. int[], long[], double[],
float[], boolean[], short[], byte[]) alongside the existing boxed checks.
##########
fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java:
##########
@@ -120,14 +123,21 @@ static void validateCompatibility(DataType fieldType,
PojoType.Property prop) {
"Unsupported field type %s for field %s.",
fieldType.getTypeRoot(), prop.name));
}
- if (!supported.contains(actual)) {
+ if (!supported.contains(actual)
+ & !SUPPORTED_COMPLEX_TYPES.contains(fieldType.getTypeRoot())) {
Review Comment:
did you mean && here?
##########
website/docs/_configs/_partial_config.mdx:
##########
@@ -78,7 +78,7 @@
| `client.lookup.batch-timeout` | `0 s` | Duration | The maximum time to wait
for the lookup batch to full, if this timeout is reached, the lookup batch will
be closed to send. |
| `client.lookup.max-retries` | `2147483647` | Integer | Setting a value
greater than zero will cause the client to resend any lookup request that fails
with a potentially transient error. |
| `client.scanner.remote-log.prefetch-num` | `4` | Integer | The number of
remote log segments to keep in local temp file for LogScanner, which download
from remote storage. The default setting is 4. |
-| `client.scanner.io.tmpdir` |
`/var/folders/bp/v2l48kz51mx86d743qv0zhzh0000gn/T//fluss` | String | Local
directory that is used by client for storing the data files (like kv snapshot,
log segment files) to read temporarily |
+| `client.scanner.io.tmpdir` |
`/var/folders/p1/4yvqmcrs6n92xl_w_k3z43th0000gn/T//fluss` | String | Local
directory that is used by client for storing the data files (like kv snapshot,
log segment files) to read temporarily |
Review Comment:
why changes this?
##########
fluss-client/src/test/java/org/apache/fluss/client/converter/ConvertersTestFixtures.java:
##########
@@ -172,8 +191,9 @@ public int hashCode() {
timeField,
timestampField,
timestampLtzField,
- offsetDateTimeField);
- result = 31 * result + Arrays.hashCode(bytesField);
+ offsetDateTimeField,
+ mapField);
+ result = 31 * result + Arrays.hashCode(bytesField) +
Arrays.hashCode(arrayField);
Review Comment:
// Change to two separate lines to preserve hash distribution
result = 31 * result + Arrays.hashCode(bytesField); // correct
##########
website/docs/_configs/_partial_config.mdx:
##########
@@ -284,7 +284,7 @@
| `table.auto-partition.enabled` | `false` | Boolean | Whether enable auto
partition for the table. Disable by default. When auto partition is enabled,
the partitions of the table will be created automatically. |
| `table.auto-partition.key` | `none` | String | This configuration defines
the time-based partition key to be used for auto-partitioning when a table is
partitioned with multiple keys. Auto-partitioning utilizes a time-based
partition key to handle partitions automatically, including creating new ones
and removing outdated ones, by comparing the time value of the partition with
the current system time. In the case of a table using multiple partition keys
(such as a composite partitioning strategy), this feature determines which key
should serve as the primary time dimension for making auto-partitioning
decisions.And If the table has only one partition key, this config is not
necessary. Otherwise, it must be specified. |
| `table.auto-partition.time-unit` | `DAY` | AutoPartitionTimeUnit | The time
granularity for auto created partitions. The default value is `DAY`. Valid
values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`,
the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the
partition format for auto created is yyyyMMdd. If the value is `MONTH`, the
partition format for auto created is yyyyMM. If the value is `QUARTER`, the
partition format for auto created is yyyyQ. If the value is `YEAR`, the
partition format for auto created is yyyy. |
-| `table.auto-partition.time-zone` | `Europe/Paris` | String | The time zone
for auto partitions, which is by default the same as the system time zone. |
+| `table.auto-partition.time-zone` | `Asia/Ho_Chi_Minh` | String | The time
zone for auto partitions, which is by default the same as the system time zone.
|
Review Comment:
why changes this?
##########
fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java:
##########
@@ -42,6 +41,8 @@
final class ConverterCommons {
static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES =
createSupportedTypes();
+ static final Set<DataTypeRoot> SUPPORTED_COMPLEX_TYPES =
+ EnumSet.of(DataTypeRoot.ARRAY, DataTypeRoot.MAP);
Review Comment:
SUPPORTED_COMPLEX_TYPES and its ARRAY/MAP entries in SUPPORTED_TYPES are
redundant
##########
fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java:
##########
@@ -63,6 +64,8 @@ private static Map<DataTypeRoot, Set<Class<?>>>
createSupportedTypes() {
map.put(
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
setOf(java.time.Instant.class,
java.time.OffsetDateTime.class));
+ map.put(DataTypeRoot.ARRAY, setOf(java.util.Arrays.class));
Review Comment:
java.util.Arrays.class is a utility class, not an array type so this
sentinel is wrong. More importantly, the SUPPORTED_COMPLEX_TYPES bypass on line
127 makes these two SUPPORTED_TYPES entries dead code.
Pick one approach: either validate the Java type properly for ARRAY/MAP and
remove SUPPORTED_COMPLEX_TYPES, or remove the dead SUPPORTED_TYPES entries and
keep the bypass.
##########
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoMaptoFlussMap.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.converter;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.MapType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Adapter class for converting Pojo Map to Fluss InternalMap. */
+public class PojoMaptoFlussMap implements InternalMap {
+ private final Map<?, ?> pojoMap;
+ private final MapType mapType;
+ private final String fieldName;
+
+ public PojoMaptoFlussMap(Map<?, ?> pojoMap, MapType mapType, String
fieldName) {
+ this.pojoMap = pojoMap;
+ this.mapType = mapType;
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public int size() {
+ return pojoMap.size();
+ }
+
+ @Override
+ public InternalArray keyArray() {
Review Comment:
keyArray() and valueArray() iterate the map independently; keys and values
can misalign
Two separate iterations over a HashMap are not guaranteed to match order.
Convert once at construction time and cache both arrays.
public InternalArray keyArray() {
List<?> pojoArray = new ArrayList<>(pojoMap.keySet()); // iteration 1
...
}
public InternalArray valueArray() {
List<?> pojoArray = new ArrayList<>(pojoMap.values()); // iteration 2 —
order may differ
...
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]