Yohahaha commented on code in PR #2956:
URL: https://github.com/apache/fluss/pull/2956#discussion_r3027321020


##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.lake.committer.{CommitterInitContext, LakeCommitter}
+import org.apache.fluss.lake.writer.{LakeTieringFactory, WriterInitContext}
+import org.apache.fluss.metadata.{TableBucket, TableInfo, TablePath}
+import org.apache.fluss.rpc.messages.{CommitLakeTableSnapshotRequest, 
PbLakeTableOffsetForBucket, PbLakeTableSnapshotInfo}
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
+
+import org.apache.spark.sql.Row
+
+import java.time.Duration
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+/**
+ * Base class for lake-enabled log table read tests. Subclasses provide the 
lake format config and
+ * tiering factory.
+ */
+abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase {
+
+  protected var warehousePath: String = _
+
+  protected def createLakeTieringFactory(): LakeTieringFactory[_, _]
+
+  override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS 
$DEFAULT_DATABASE.$t"))
+    }
+  }
+
+  private def tierToLake(tp: TablePath, ti: TableInfo, expectedRecordCount: 
Int): Long = {
+    val tableId = ti.getTableId
+
+    val table = loadFlussTable(tp)
+    val logScanner = table.newScan().createLogScanner()
+    logScanner.subscribeFromBeginning(0)
+
+    val scanRecords =
+      new 
java.util.ArrayList[org.apache.fluss.client.table.scanner.ScanRecord]()
+    val deadline = System.currentTimeMillis() + 30000
+    while (scanRecords.size() < expectedRecordCount && 
System.currentTimeMillis() < deadline) {
+      val batch = logScanner.poll(Duration.ofSeconds(1))
+      batch.iterator().asScala.foreach(r => scanRecords.add(r))
+    }
+    assert(
+      scanRecords.size() == expectedRecordCount,
+      s"Expected $expectedRecordCount scan records, got ${scanRecords.size()}")
+    val logEndOffset = scanRecords.asScala.map(_.logOffset()).max + 1
+
+    val factory = createLakeTieringFactory()
+
+    val tb = new TableBucket(tableId, null, 0)
+    val lakeWriter = factory
+      .asInstanceOf[LakeTieringFactory[Any, Any]]
+      .createLakeWriter(new WriterInitContext {
+        override def tablePath(): TablePath = tp
+        override def tableBucket(): TableBucket = tb
+        override def partition(): String = null
+        override def tableInfo(): TableInfo = ti
+      })
+    for (record <- scanRecords.asScala) {
+      lakeWriter.write(record)
+    }
+    val writeResult = lakeWriter.complete()
+    lakeWriter.close()
+
+    val lakeCommitter = factory
+      .asInstanceOf[LakeTieringFactory[Any, Any]]
+      .createLakeCommitter(new CommitterInitContext {
+        override def tablePath(): TablePath = tp
+        override def tableInfo(): TableInfo = ti
+        override def lakeTieringConfig(): Configuration = new Configuration()
+        override def flussClientConfig(): Configuration = new Configuration()
+      })
+    val committable =
+      lakeCommitter.toCommittable(Collections.singletonList(writeResult))
+    val commitResult =
+      lakeCommitter.commit(committable, Collections.emptyMap())
+    val snapshotId = commitResult.getCommittedSnapshotId
+    lakeCommitter.close()
+
+    val coordinatorGateway = flussServer.newCoordinatorClient()
+    val request = new CommitLakeTableSnapshotRequest()
+    val tableReq: PbLakeTableSnapshotInfo = request.addTablesReq()
+    tableReq.setTableId(tableId)
+    tableReq.setSnapshotId(snapshotId)
+    val bucketReq: PbLakeTableOffsetForBucket = tableReq.addBucketsReq()
+    bucketReq.setBucketId(0)
+    bucketReq.setLogEndOffset(logEndOffset)
+    bucketReq.setMaxTimestamp(System.currentTimeMillis())
+    coordinatorGateway.commitLakeTableSnapshot(request).get()
+
+    Thread.sleep(2000)
+
+    logScanner.close()
+    table.close()
+
+    logEndOffset
+  }
+
+  test("Spark Lake Read: log table falls back when no lake snapshot") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(1, "hello"), (2, "world"), (3, "fluss")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+        Row(1, "hello") :: Row(2, "world") :: Row(3, "fluss") :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT name FROM $DEFAULT_DATABASE.t ORDER BY name"),
+        Row("fluss") :: Row("hello") :: Row("world") :: Nil
+      )
+    }
+  }
+
+  test("Spark Lake Read: log table lake-only (all data in lake, no log tail)") 
{
+    withTable("t_lake_only") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES
+             |(1, "alpha"), (2, "beta"), (3, "gamma")
+             |""".stripMargin)
+
+      val tablePath = createTablePath("t_lake_only")
+      val table = loadFlussTable(tablePath)
+      val tableInfo = table.getTableInfo
+      table.close()

Review Comment:
   seems no need to call close explicitly.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, 
TableInfo, TablePath}
+import org.apache.fluss.utils.ExceptionUtils
+
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Batch for reading lake-enabled log table (append-only table with 
datalake). */
+class FlussLakeAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  // Required by FlussBatch but unused — lake snapshot determines start 
offsets.
+  override val startOffsetsInitializer: OffsetsInitializer = 
OffsetsInitializer.earliest()
+
+  override val stoppingOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, 
flussConfig)
+  }
+
+  private lazy val planned: (Array[InputPartition], Boolean) = doPlan()
+
+  override def planInputPartitions(): Array[InputPartition] = planned._1
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    if (planned._2) {
+      new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+    } else {
+      new FlussLakeAppendPartitionReaderFactory(
+        tableInfo.getProperties.toMap,
+        tablePath,
+        tableInfo.getRowType,
+        projection,
+        flussConfig)
+    }
+  }
+
+  private def doPlan(): (Array[InputPartition], Boolean) = {

Review Comment:
   when second arg is true, means no lake split was found?  please comments 
this method



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.lake.committer.{CommitterInitContext, LakeCommitter}
+import org.apache.fluss.lake.writer.{LakeTieringFactory, WriterInitContext}
+import org.apache.fluss.metadata.{TableBucket, TableInfo, TablePath}
+import org.apache.fluss.rpc.messages.{CommitLakeTableSnapshotRequest, 
PbLakeTableOffsetForBucket, PbLakeTableSnapshotInfo}
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
+
+import org.apache.spark.sql.Row
+
+import java.time.Duration
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+/**
+ * Base class for lake-enabled log table read tests. Subclasses provide the 
lake format config and
+ * tiering factory.
+ */
+abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase {
+
+  protected var warehousePath: String = _
+
+  protected def createLakeTieringFactory(): LakeTieringFactory[_, _]
+
+  override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS 
$DEFAULT_DATABASE.$t"))
+    }
+  }
+
+  private def tierToLake(tp: TablePath, ti: TableInfo, expectedRecordCount: 
Int): Long = {

Review Comment:
   I'm not sure if we can introduce flink tiering test module like 
`PaimonTieringITCase` to simulate prod usage.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, 
TableInfo, TablePath}
+import org.apache.fluss.utils.ExceptionUtils
+
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Batch for reading lake-enabled log table (append-only table with 
datalake). */
+class FlussLakeAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  // Required by FlussBatch but unused — lake snapshot determines start 
offsets.
+  override val startOffsetsInitializer: OffsetsInitializer = 
OffsetsInitializer.earliest()
+
+  override val stoppingOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, 
flussConfig)
+  }
+
+  private lazy val planned: (Array[InputPartition], Boolean) = doPlan()
+
+  override def planInputPartitions(): Array[InputPartition] = planned._1
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    if (planned._2) {
+      new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+    } else {
+      new FlussLakeAppendPartitionReaderFactory(
+        tableInfo.getProperties.toMap,
+        tablePath,
+        tableInfo.getRowType,
+        projection,
+        flussConfig)
+    }
+  }
+
+  private def doPlan(): (Array[InputPartition], Boolean) = {
+    val lakeSnapshot =
+      try {
+        admin.getReadableLakeSnapshot(tablePath).get()
+      } catch {
+        case e: Exception =>
+          if (
+            ExceptionUtils
+              .stripExecutionException(e)
+              .isInstanceOf[LakeTableSnapshotNotExistException]
+          ) {
+            return (planFallbackPartitions(), true)
+          }
+          throw e
+      }
+
+    val lakeSource = 
FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
+    lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+
+    val lakeSplits = lakeSource
+      .createPlanner(new LakeSource.PlannerContext {
+        override def snapshotId(): Long = lakeSnapshot.getSnapshotId
+      })
+      .plan()
+
+    val splitSerializer = lakeSource.getSplitSerializer
+    val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+    val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+
+    val partitions = if (tableInfo.isPartitioned) {
+      planPartitionedTable(
+        lakeSplits.asScala,
+        splitSerializer,
+        tableBucketsOffset,
+        buckets,
+        bucketOffsetsRetriever)
+    } else {
+      planNonPartitionedTable(
+        lakeSplits.asScala,
+        splitSerializer,
+        tableBucketsOffset,
+        buckets,
+        bucketOffsetsRetriever)
+    }
+
+    (partitions, false)
+  }
+
+  private def planNonPartitionedTable(
+      lakeSplits: Seq[LakeSplit],
+      splitSerializer: SimpleVersionedSerializer[LakeSplit],
+      tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+      buckets: Seq[Int],
+      bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): 
Array[InputPartition] = {
+    val result = mutable.ArrayBuffer.empty[InputPartition]
+    val tableId = tableInfo.getTableId
+
+    addLakePartitions(result, lakeSplits, splitSerializer, tableId, 
partitionId = null)
+
+    val stoppingOffsets =
+      getBucketOffsets(stoppingOffsetsInitializer, null, buckets, 
bucketOffsetsRetriever)
+    buckets.foreach {
+      bucketId =>
+        val tableBucket = new TableBucket(tableId, bucketId)
+        addLogTailPartition(result, tableBucket, tableBucketsOffset, 
stoppingOffsets(bucketId))
+    }
+
+    result.toArray
+  }
+
+  private def planPartitionedTable(
+      lakeSplits: Seq[LakeSplit],
+      splitSerializer: SimpleVersionedSerializer[LakeSplit],
+      tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+      buckets: Seq[Int],
+      bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): 
Array[InputPartition] = {
+    val result = mutable.ArrayBuffer.empty[InputPartition]

Review Comment:
   merge full partitions with lake partitions and log partitions, let 
`addXXXPartitions` be more pure function, not change the input arg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to