leekeiabstraction commented on code in PR #2578:
URL: https://github.com/apache/fluss/pull/2578#discussion_r2972160077


##########
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -355,4 +356,170 @@ private Schema createTable(LanceConfig config, 
Map<String, String> customPropert
 
         return schema;
     }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTableWithNestedRowType(boolean isPartitioned) throws 
Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "nestedRowTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createNestedRowTable(config);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+
+        // First, write data with nested row types
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, 
tableInfo)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genNestedRowLogRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // Second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            LakeCommitResult commitResult =
+                    lakeCommitter.commit(lanceCommittable, snapshotProperties);
+            // lance dataset version starts from 1
+            assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            // Verify data can be read back
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyNestedRowRecords(readerRoot, expectRecords, bucket, 
isPartitioned);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }
+
+    private Schema createNestedRowTable(LanceConfig config) {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT())));
+        Schema schema = schemaBuilder.build();
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        return schema;
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genNestedRowLogRecords(
+            @Nullable String partition, int bucket, int numRecords) {

Review Comment:
   `partition` is unused, remove?



##########
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -355,4 +356,170 @@ private Schema createTable(LanceConfig config, 
Map<String, String> customPropert
 
         return schema;
     }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTableWithNestedRowType(boolean isPartitioned) throws 
Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "nestedRowTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createNestedRowTable(config);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+
+        // First, write data with nested row types
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, 
tableInfo)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genNestedRowLogRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // Second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            LakeCommitResult commitResult =
+                    lakeCommitter.commit(lanceCommittable, snapshotProperties);
+            // lance dataset version starts from 1
+            assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            // Verify data can be read back
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyNestedRowRecords(readerRoot, expectRecords, bucket, 
isPartitioned);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }
+
+    private Schema createNestedRowTable(LanceConfig config) {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT())));
+        Schema schema = schemaBuilder.build();
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        return schema;
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genNestedRowLogRecords(
+            @Nullable String partition, int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow = new GenericRow(3);
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("user" + bucket + 
"_" + i));
+
+            // Create nested address row
+            GenericRow addressRow = new GenericRow(2);
+            addressRow.setField(0, BinaryString.fromString("city" + bucket));
+            addressRow.setField(1, 10000 + bucket);
+            genericRow.setField(2, addressRow);
+
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return Tuple2.of(logRecords, logRecords);
+    }
+
+    private void verifyNestedRowRecords(
+            VectorSchemaRoot root,
+            List<LogRecord> expectRecords,
+            int expectBucket,
+            boolean isPartitioned)

Review Comment:
   Remove unused args.



##########
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -355,4 +356,170 @@ private Schema createTable(LanceConfig config, 
Map<String, String> customPropert
 
         return schema;
     }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTableWithNestedRowType(boolean isPartitioned) throws 
Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "nestedRowTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createNestedRowTable(config);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+
+        // First, write data with nested row types
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, 
tableInfo)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genNestedRowLogRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // Second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath, tableInfo)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            Map<String, String> snapshotProperties =
+                    
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
+            LakeCommitResult commitResult =
+                    lakeCommitter.commit(lanceCommittable, snapshotProperties);
+            // lance dataset version starts from 1
+            assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            // Verify data can be read back
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyNestedRowRecords(readerRoot, expectRecords, bucket, 
isPartitioned);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+    }
+
+    private Schema createNestedRowTable(LanceConfig config) {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column(
+                                "address",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("city", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("zip", 
DataTypes.INT())));
+        Schema schema = schemaBuilder.build();
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        return schema;
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genNestedRowLogRecords(
+            @Nullable String partition, int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow = new GenericRow(3);
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("user" + bucket + 
"_" + i));
+
+            // Create nested address row
+            GenericRow addressRow = new GenericRow(2);
+            addressRow.setField(0, BinaryString.fromString("city" + bucket));
+            addressRow.setField(1, 10000 + bucket);
+            genericRow.setField(2, addressRow);
+
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return Tuple2.of(logRecords, logRecords);
+    }
+
+    private void verifyNestedRowRecords(
+            VectorSchemaRoot root,
+            List<LogRecord> expectRecords,
+            int expectBucket,
+            boolean isPartitioned)
+            throws Exception {

Review Comment:
   No method calls throw Exceptions, remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to