gh-yzou commented on code in PR #2239:
URL: https://github.com/apache/polaris/pull/2239#discussion_r2249026117


##########
plugins/spark/README.md:
##########
@@ -83,6 +86,9 @@ bin/spark-shell \
 --conf spark.sql.catalog.polaris.credential="root:secret" \
 --conf spark.sql.catalog.polaris.scope='PRINCIPAL_ROLE:ALL' \
 --conf spark.sql.catalog.polaris.token-refresh-enabled=true \
+--conf 
spark.sql.catalog.<lance-catalog-name>=com.lancedb.lance.spark.LanceNamespaceSparkCatalog
 \

Review Comment:
   Sorry, i am not very familiar with lancerDB and lance-spark, could you 
explain here about what are those configuration ? how are those configs used in 
spark? Thanks!



##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkLanceIT.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkLanceIT extends SparkIntegrationBase {

Review Comment:
   based on the read me, you will need to build a spark session with correct 
configuration, and you can do this by overwrite the buildSparkSession function 
like here 
https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java#L29



##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkLanceIT.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkLanceIT extends SparkIntegrationBase {
+  private String defaultNs;
+  private String tableRootDir;
+
+  private String getTableLocation(String tableName) {
+    return String.format("%s/%s", tableRootDir, tableName);
+  }
+
+  private String getTableNameWithRandomSuffix() {
+    return generateName("lancetb");
+  }
+
+  @BeforeEach
+  public void createDefaultResources(@TempDir Path tempDir) {
+    spark.sparkContext().setLogLevel("WARN");
+    defaultNs = generateName("lance");
+    // create a default namespace
+    sql("CREATE NAMESPACE %s", defaultNs);
+    sql("USE NAMESPACE %s", defaultNs);
+    tableRootDir =
+        
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+  }
+
+  @AfterEach
+  public void cleanupLanceData() {
+    // clean up lance data
+    File dirToDelete = new File(tableRootDir);
+    FileUtils.deleteQuietly(dirToDelete);
+    sql("DROP NAMESPACE %s", defaultNs);
+  }
+
+  @Test
+  public void testBasicTableOperations() {
+    // create a regular lance table with provider property
+    String lancetb1 = "lancetb1";
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) LOCATION '%s' TBLPROPERTIES 
('provider' = 'lance')",
+        lancetb1, getTableLocation(lancetb1));
+    sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", lancetb1);
+    List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id 
DESC", lancetb1);
+    assertThat(results.size()).isEqualTo(1);
+    assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
+
+    // Lance doesn't support partitioned tables, so we'll test a table with 
more complex data types
+    String lancetb2 = "lancetb2";
+    sql(
+        "CREATE TABLE %s (name String, age INT, score DOUBLE, active BOOLEAN) 
LOCATION '%s' TBLPROPERTIES ('provider' = 'lance')",
+        lancetb2, getTableLocation(lancetb2));
+    sql(
+        "INSERT INTO %s VALUES ('anna', 10, 95.5, true), ('james', 32, 87.2, 
false), ('yan', 16, 92.0, true)",
+        lancetb2);
+    results = sql("SELECT name, score FROM %s ORDER BY age", lancetb2);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results.get(0)).isEqualTo(new Object[] {"anna", 95.5});
+    assertThat(results.get(1)).isEqualTo(new Object[] {"yan", 92.0});
+    assertThat(results.get(2)).isEqualTo(new Object[] {"james", 87.2});
+
+    // drop table
+    sql("DROP TABLE %s", lancetb1);
+    sql("DROP TABLE %s", lancetb2);
+
+    // check drop works
+    assertThatThrownBy(() -> sql("SELECT * FROM %s", lancetb1))
+        .hasMessageContaining("TABLE_OR_VIEW_NOT_FOUND");
+  }
+
+  @Test
+  public void testCreateTableWithLocationViaSparkDataFrame() {
+    String lancetb1 = getTableNameWithRandomSuffix();
+    String lancetb2 = getTableNameWithRandomSuffix();
+
+    // create a dataframe with some data
+    StructType schema =
+        new StructType(
+            new StructField[] {
+              new StructField("id", DataTypes.IntegerType, false, 
Metadata.empty()),
+              new StructField("name", DataTypes.StringType, false, 
Metadata.empty())
+            });
+    List<Row> data = Arrays.asList(RowFactory.create(1, "anna"), 
RowFactory.create(2, "bob"));
+    Dataset<Row> df = spark.createDataFrame(data, schema);
+
+    // write the dataframe to a lance table
+    df.write()
+        .mode("overwrite")
+        .format("lance")
+        .option("path", getTableLocation(lancetb1))
+        .saveAsTable(lancetb1);
+
+    // read the table
+    List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", lancetb1);
+    assertThat(results.size()).isEqualTo(2);
+    assertThat(results.get(0)).isEqualTo(new Object[] {1, "anna"});
+    assertThat(results.get(1)).isEqualTo(new Object[] {2, "bob"});
+
+    // create another table with different data
+    List<Row> data2 = Arrays.asList(RowFactory.create(3, "charlie"), 
RowFactory.create(4, "david"));
+    Dataset<Row> df2 = spark.createDataFrame(data2, schema);
+    df2.write()
+        .mode("overwrite")
+        .format("lance")
+        .option("path", getTableLocation(lancetb2))
+        .saveAsTable(lancetb2);
+
+    // verify data isolation
+    results = sql("SELECT * FROM %s ORDER BY id", lancetb2);
+    assertThat(results.size()).isEqualTo(2);
+    assertThat(results.get(0)).isEqualTo(new Object[] {3, "charlie"});
+    assertThat(results.get(1)).isEqualTo(new Object[] {4, "david"});
+
+    sql("DROP TABLE %s", lancetb1);
+    sql("DROP TABLE %s", lancetb2);
+  }
+
+  @Test
+  public void testComplexDataTypes() {
+    String lancetb = getTableNameWithRandomSuffix();
+
+    // Create table with array and struct types
+    sql(
+        "CREATE TABLE %s (id INT, tags ARRAY<STRING>, info STRUCT<city:STRING, 
zip:INT>) LOCATION '%s' TBLPROPERTIES ('provider' = 'lance')",
+        lancetb, getTableLocation(lancetb));
+
+    // Insert data with complex types
+    sql(
+        "INSERT INTO %s VALUES (1, array('tag1', 'tag2'), struct('NYC', 
10001)), (2, array('tag3'), struct('SF', 94105))",
+        lancetb);
+
+    // Query complex types
+    List<Object[]> results = sql("SELECT id, tags[0], info.city FROM %s ORDER 
BY id", lancetb);
+    assertThat(results.size()).isEqualTo(2);
+    assertThat(results.get(0)[0]).isEqualTo(1);
+    assertThat(results.get(0)[1]).isEqualTo("tag1");
+    assertThat(results.get(0)[2]).isEqualTo("NYC");
+    assertThat(results.get(1)[0]).isEqualTo(2);
+    assertThat(results.get(1)[1]).isEqualTo("tag3");
+    assertThat(results.get(1)[2]).isEqualTo("SF");
+
+    sql("DROP TABLE %s", lancetb);
+  }
+
+  @Test
+  public void testReadWritePerformance() {
+    String lancetb = getTableNameWithRandomSuffix();
+
+    // Create table
+    sql(
+        "CREATE TABLE %s (id BIGINT, value DOUBLE, data STRING) LOCATION '%s' 
TBLPROPERTIES ('provider' = 'lance')",
+        lancetb, getTableLocation(lancetb));
+
+    // Insert batch of data
+    StringBuilder insertSql = new StringBuilder("INSERT INTO " + lancetb + " 
VALUES ");
+    for (int i = 0; i < 100; i++) {
+      if (i > 0) insertSql.append(", ");
+      insertSql.append(String.format("(%d, %f, 'data_%d')", i, i * 1.5, i));
+    }
+    sql(insertSql.toString());
+
+    // Verify count
+    List<Object[]> results = sql("SELECT COUNT(*) FROM %s", lancetb);
+    assertThat(results.get(0)[0]).isEqualTo(100L);
+
+    // Test filtering
+    results = sql("SELECT * FROM %s WHERE id >= 90 ORDER BY id", lancetb);
+    assertThat(results.size()).isEqualTo(10);
+    assertThat(results.get(0)).isEqualTo(new Object[] {90L, 135.0, "data_90"});
+
+    sql("DROP TABLE %s", lancetb);
+  }
+
+  @Test
+  public void testMixedTableFormats() {
+    // Test that Lance tables can coexist with Iceberg tables
+    String icebergTable = "iceberg_table";
+    String lanceTable = "lance_table";
+
+    // Create an Iceberg table (default provider)
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) LOCATION '%s'",
+        icebergTable, getTableLocation(icebergTable));
+    sql("INSERT INTO %s VALUES (1, 'iceberg_data')", icebergTable);
+
+    // Create a Lance table
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) LOCATION '%s' TBLPROPERTIES 
('provider' = 'lance')",

Review Comment:
   will we be able to have all three formats, includes iceberg, delta and 
lancer?



##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkLanceIT.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkLanceIT extends SparkIntegrationBase {
+  private String defaultNs;
+  private String tableRootDir;
+
+  private String getTableLocation(String tableName) {
+    return String.format("%s/%s", tableRootDir, tableName);
+  }
+
+  private String getTableNameWithRandomSuffix() {
+    return generateName("lancetb");
+  }
+
+  @BeforeEach
+  public void createDefaultResources(@TempDir Path tempDir) {
+    spark.sparkContext().setLogLevel("WARN");
+    defaultNs = generateName("lance");
+    // create a default namespace
+    sql("CREATE NAMESPACE %s", defaultNs);
+    sql("USE NAMESPACE %s", defaultNs);
+    tableRootDir =
+        
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+  }
+
+  @AfterEach
+  public void cleanupLanceData() {
+    // clean up lance data
+    File dirToDelete = new File(tableRootDir);
+    FileUtils.deleteQuietly(dirToDelete);
+    sql("DROP NAMESPACE %s", defaultNs);
+  }
+
+  @Test
+  public void testBasicTableOperations() {
+    // create a regular lance table with provider property
+    String lancetb1 = "lancetb1";
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) LOCATION '%s' TBLPROPERTIES 
('provider' = 'lance')",

Review Comment:
   today, delta integration doesn't support CTAS syntax because delta catalog 
doesn't support CTAS syntax with rest (unity) catalog, for lancer, is CTAS 
working? 
   Based on the test below, i see 
   ```
   df.write()
           .mode("overwrite")
           .format("lance")
           .option("path", getTableLocation(lancetb1))
           .saveAsTable(lancetb1);
   ```
   I assume CTAS syntax works. if yes, can we add one test to test the CTAS 
syntax?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to