This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a453fd5  [SPARK-36914][SQL] Implement dropIndex and listIndexes in 
JDBC (MySQL dialect)
a453fd5 is described below

commit a453fd55dd37516fbfb9332cf43e360796dfb955
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Oct 12 22:36:47 2021 +0800

    [SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL 
dialect)
    
    ### What changes were proposed in this pull request?
    This PR implements `dropIndex` and `listIndexes` in MySQL dialect
    
    ### Why are the changes needed?
    As a subtask of the V2 Index support, this PR completes the implementation 
for JDBC V2 index support.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, `dropIndex/listIndexes` in DS V2 JDBC
    
    ### How was this patch tested?
    new tests
    
    Closes #34236 from huaxingao/listIndexJDBC.
    
    Authored-by: Huaxin Gao <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 33 +++-----
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 95 +++++++++++++++++++++-
 .../sql/connector/catalog/index/SupportsIndex.java |  7 +-
 .../sql/connector/catalog/index/TableIndex.java    | 12 ++-
 .../catalyst/analysis/NoSuchItemException.scala    |  4 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 24 ++++++
 .../execution/datasources/v2/jdbc/JDBCTable.scala  | 13 ++-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 25 +++++-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 84 +++++++++++++++----
 9 files changed, 239 insertions(+), 58 deletions(-)

diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 3cb8787..67e8108 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
-import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, 
TableCatalog}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite with V2JDBCTest {
     assert(t.schema === expectedSchema)
   }
 
-  override def testIndex(tbl: String): Unit = {
-    val loaded = Catalogs.load("mysql", conf)
-    val jdbcTable = loaded.asInstanceOf[TableCatalog]
-      .loadTable(Identifier.of(Array.empty[String], "new_table"))
-      .asInstanceOf[SupportsIndex]
-    assert(jdbcTable.indexExists("i1") == false)
-    assert(jdbcTable.indexExists("i2") == false)
+  override def supportsIndex: Boolean = true
 
+  override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
     val properties = new util.Properties();
     properties.put("KEY_BLOCK_SIZE", "10")
     properties.put("COMMENT", "'this is a comment'")
-    jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+    // MySQL doesn't allow property set on individual column, so use empty 
Array for
+    // column properties
+    jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
       Array.empty[util.Map[NamedReference, util.Properties]], properties)
 
-    jdbcTable.createIndex("i2", "",
-      Array(FieldReference("col2"), FieldReference("col3"), 
FieldReference("col5")),
-      Array.empty[util.Map[NamedReference, util.Properties]], new 
util.Properties)
-
-    assert(jdbcTable.indexExists("i1") == true)
-    assert(jdbcTable.indexExists("i2") == true)
-
-    val m = intercept[IndexAlreadyExistsException] {
-      jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
-        Array.empty[util.Map[NamedReference, util.Properties]], properties)
-    }.getMessage
-    assert(m.contains("Failed to create index: i1 in new_table"))
+    var index = jdbcTable.listIndexes()
+    // The index property size is actually 1. Even though the index is created
+    // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a 
comment'", when
+    // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
+    assert(index(0).properties.size == 1)
+    assert(index(0).properties.get("COMMENT").equals("this is a comment"))
   }
 }
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index da57ed7..f3e3b34 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -17,9 +17,15 @@
 
 package org.apache.spark.sql.jdbc.v2
 
+import java.util
+
 import org.apache.log4j.Level
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, 
TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -181,12 +187,93 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
     }
   }
 
-  def testIndex(tbl: String): Unit = {}
+  def supportsIndex: Boolean = false
+  def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
 
   test("SPARK-36913: Test INDEX") {
-    withTable(s"$catalogName.new_table") {
-      sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, 
col4 INT, col5 INT)")
-      testIndex(s"$catalogName.new_table")
+    if (supportsIndex) {
+      withTable(s"$catalogName.new_table") {
+        sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 
INT," +
+          s" col4 INT, col5 INT)")
+        val loaded = Catalogs.load(catalogName, conf)
+        val jdbcTable = loaded.asInstanceOf[TableCatalog]
+          .loadTable(Identifier.of(Array.empty[String], "new_table"))
+          .asInstanceOf[SupportsIndex]
+        assert(jdbcTable.indexExists("i1") == false)
+        assert(jdbcTable.indexExists("i2") == false)
+
+        val properties = new util.Properties();
+        val indexType = "DUMMY"
+        var m = intercept[UnsupportedOperationException] {
+          jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
+            Array.empty[util.Map[NamedReference, util.Properties]], properties)
+        }.getMessage
+        assert(m.contains(s"Index Type $indexType is not supported." +
+          s" The supported Index Types are: BTREE and HASH"))
+
+        jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
+          Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+        jdbcTable.createIndex("i2", "",
+          Array(FieldReference("col2"), FieldReference("col3"), 
FieldReference("col5")),
+          Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+        assert(jdbcTable.indexExists("i1") == true)
+        assert(jdbcTable.indexExists("i2") == true)
+
+        m = intercept[IndexAlreadyExistsException] {
+          jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+            Array.empty[util.Map[NamedReference, util.Properties]], properties)
+        }.getMessage
+        assert(m.contains("Failed to create index: i1 in new_table"))
+
+        var index = jdbcTable.listIndexes()
+        assert(index.length == 2)
+
+        assert(index(0).indexName.equals("i1"))
+        assert(index(0).indexType.equals("BTREE"))
+        var cols = index(0).columns
+        assert(cols.length == 1)
+        assert(cols(0).describe().equals("col1"))
+        assert(index(0).properties.size == 0)
+
+        assert(index(1).indexName.equals("i2"))
+        assert(index(1).indexType.equals("BTREE"))
+        cols = index(1).columns
+        assert(cols.length == 3)
+        assert(cols(0).describe().equals("col2"))
+        assert(cols(1).describe().equals("col3"))
+        assert(cols(2).describe().equals("col5"))
+        assert(index(1).properties.size == 0)
+
+        jdbcTable.dropIndex("i1")
+        assert(jdbcTable.indexExists("i1") == false)
+        assert(jdbcTable.indexExists("i2") == true)
+
+        index = jdbcTable.listIndexes()
+        assert(index.length == 1)
+
+        assert(index(0).indexName.equals("i2"))
+        assert(index(0).indexType.equals("BTREE"))
+        cols = index(0).columns
+        assert(cols.length == 3)
+        assert(cols(0).describe().equals("col2"))
+        assert(cols(1).describe().equals("col3"))
+        assert(cols(2).describe().equals("col5"))
+
+        jdbcTable.dropIndex("i2")
+        assert(jdbcTable.indexExists("i1") == false)
+        assert(jdbcTable.indexExists("i2") == false)
+        index = jdbcTable.listIndexes()
+        assert(index.length == 0)
+
+        m = intercept[NoSuchIndexException] {
+          jdbcTable.dropIndex("i2")
+        }.getMessage
+        assert(m.contains("Failed to drop index: i2"))
+
+        testIndexProperties(jdbcTable)
+      }
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 24961e4..4181cf5 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -42,7 +42,7 @@ public interface SupportsIndex extends Table {
    * @param columns the columns on which index to be created
    * @param columnsProperties the properties of the columns on which index to 
be created
    * @param properties the properties of the index to be created
-   * @throws IndexAlreadyExistsException If the index already exists (optional)
+   * @throws IndexAlreadyExistsException If the index already exists.
    */
   void createIndex(String indexName,
       String indexType,
@@ -55,10 +55,9 @@ public interface SupportsIndex extends Table {
    * Drops the index with the given name.
    *
    * @param indexName the name of the index to be dropped.
-   * @return true if the index is dropped
-   * @throws NoSuchIndexException If the index does not exist (optional)
+   * @throws NoSuchIndexException If the index does not exist.
    */
-  boolean dropIndex(String indexName) throws NoSuchIndexException;
+  void dropIndex(String indexName) throws NoSuchIndexException;
 
   /**
    * Checks whether an index exists in this table.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
index 99fce80..977ed8d 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
@@ -53,27 +53,25 @@ public final class TableIndex {
   /**
    * @return the Index name.
    */
-  String indexName() { return indexName; }
+  public String indexName() { return indexName; }
 
   /**
    * @return the indexType of this Index.
    */
-  String indexType() { return indexType; }
+  public String indexType() { return indexType; }
 
   /**
    * @return the column(s) this Index is on. Could be multi columns (a 
multi-column index).
    */
-  NamedReference[] columns() { return columns; }
+  public NamedReference[] columns() { return columns; }
 
   /**
    * @return the map of column and column property map.
    */
-  Map<NamedReference, Properties> columnProperties() { return 
columnProperties; }
+  public Map<NamedReference, Properties> columnProperties() { return 
columnProperties; }
 
   /**
    * Returns the index properties.
    */
-  Properties properties() {
-    return properties;
-  }
+  public Properties properties() { return properties; }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 500121c..805f308 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -108,5 +108,5 @@ case class NoSuchPartitionsException(override val message: 
String)
 case class NoSuchTempFunctionException(func: String)
   extends AnalysisException(s"Temporary function '$func' not found")
 
-class NoSuchIndexException(indexName: String)
-  extends AnalysisException(s"Index '$indexName' not found")
+class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
+  extends AnalysisException(message, cause = cause)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 168d16a..d482245 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localDateToDays, toJavaDate, toJavaTimestamp}
 import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import 
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
@@ -1040,6 +1041,29 @@ object JdbcUtils extends Logging with SQLConfHelper {
     dialect.indexExists(conn, indexName, tableName, options)
   }
 
+  /**
+   * Drop an index.
+   */
+  def dropIndex(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Unit = {
+    val dialect = JdbcDialects.get(options.url)
+    executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
+  }
+
+  /**
+   * List all the indexes in a table.
+   */
+  def listIndexes(
+      conn: Connection,
+      tableName: String,
+      options: JDBCOptions): Array[TableIndex] = {
+    val dialect = JdbcDialects.get(options.url)
+    dialect.listIndexes(conn, tableName, options)
+  }
+
   private def executeStatement(conn: Connection, options: JDBCOptions, sql: 
String): Unit = {
     val statement = conn.createStatement
     try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 1db938e..23ff503 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, 
jdbcOptions: JDBCOpt
     }
   }
 
-  override def dropIndex(indexName: String): Boolean = {
-    throw new UnsupportedOperationException("dropIndex is not supported yet")
+  override def dropIndex(indexName: String): Unit = {
+    JdbcUtils.withConnection(jdbcOptions) { conn =>
+      JdbcUtils.classifyException(s"Failed to drop index: $indexName",
+        JdbcDialects.get(jdbcOptions.url)) {
+        JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
+      }
+    }
   }
 
   override def listIndexes(): Array[TableIndex] = {
-    throw new UnsupportedOperationException("listIndexes is not supported yet")
+    JdbcUtils.withConnection(jdbcOptions) { conn =>
+      JdbcUtils.listIndexes(conn, name, jdbcOptions)
+    }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d1c4f8d..eb3986c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -290,7 +291,7 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   }
 
   /**
-   * Creates an index.
+   * Build a create index SQL statement.
    *
    * @param indexName         the name of the index to be created
    * @param indexType         the type of the index to be created
@@ -298,6 +299,7 @@ abstract class JdbcDialect extends Serializable with 
Logging{
    * @param columns           the columns on which index to be created
    * @param columnsProperties the properties of the columns on which index to 
be created
    * @param properties        the properties of the index to be created
+   * @return                  the SQL statement to use for creating the index.
    */
   def createIndex(
       indexName: String,
@@ -327,6 +329,27 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   }
 
   /**
+   * Build a drop index SQL statement.
+   *
+   * @param indexName the name of the index to be dropped.
+   * @param tableName the table name on which index to be dropped.
+  * @return the SQL statement to use for dropping the index.
+   */
+  def dropIndex(indexName: String, tableName: String): String = {
+    throw new UnsupportedOperationException("dropIndex is not supported")
+  }
+
+  /**
+   * Lists all the indexes in this table.
+   */
+  def listIndexes(
+      conn: Connection,
+      tableName: String,
+      options: JDBCOptions): Array[TableIndex] = {
+    throw new UnsupportedOperationException("listIndexes is not supported")
+  }
+
+  /**
    * Gets a dialect exception, classifies it and wraps it by 
`AnalysisException`.
    * @param message The error message to be placed to the returned exception.
    * @param e The dialect specific exception.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5c16ef6..7e85b3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -25,8 +25,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
-import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, 
MetadataBuilder}
@@ -127,10 +128,19 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
         indexProperties = indexProperties + " " + s"$k $v"
       }
     }
-
+    val iType = if (indexType.isEmpty) {
+      ""
+    } else {
+      if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
+        !indexType.equalsIgnoreCase("HASH")) {
+        throw new UnsupportedOperationException(s"Index Type $indexType is not 
supported." +
+          " The supported Index Types are: BTREE and HASH")
+      }
+      s"USING $indexType"
+    }
     // columnsProperties doesn't apply to MySQL so it is ignored
-    s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
-      s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) 
$indexProperties"
+    s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
+      s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) 
$indexProperties"
   }
 
   // SHOW INDEX syntax
@@ -157,17 +167,61 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
     }
   }
 
-  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
-    if (e.isInstanceOf[SQLException]) {
-      // Error codes are from
-      // 
https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes
-      e.asInstanceOf[SQLException].getErrorCode match {
-        // ER_DUP_KEYNAME
-        case 1061 =>
-          throw new IndexAlreadyExistsException(message, cause = Some(e))
-        case _ =>
+  override def dropIndex(indexName: String, tableName: String): String = {
+    s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName"
+  }
+
+  // SHOW INDEX syntax
+  // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+  override def listIndexes(
+      conn: Connection,
+      tableName: String,
+      options: JDBCOptions): Array[TableIndex] = {
+    val sql = s"SHOW INDEXES FROM $tableName"
+    var indexMap: Map[String, TableIndex] = Map()
+    try {
+      val rs = JdbcUtils.executeQuery(conn, options, sql)
+      while (rs.next()) {
+        val indexName = rs.getString("key_name")
+        val colName = rs.getString("column_name")
+        val indexType = rs.getString("index_type")
+        val indexComment = rs.getString("Index_comment")
+        if (indexMap.contains(indexName)) {
+          val index = indexMap.get(indexName).get
+          val newIndex = new TableIndex(indexName, indexType,
+            index.columns() :+ FieldReference(colName),
+            index.columnProperties, index.properties)
+          indexMap += (indexName -> newIndex)
+        } else {
+          // The only property we are building here is `COMMENT` because it's 
the only one
+          // we can get from `SHOW INDEXES`.
+          val properties = new util.Properties();
+          if (indexComment.nonEmpty) properties.put("COMMENT", indexComment)
+          val index = new TableIndex(indexName, indexType, 
Array(FieldReference(colName)),
+            new util.HashMap[NamedReference, util.Properties](), properties)
+          indexMap += (indexName -> index)
+        }
       }
+    } catch {
+      case _: Exception =>
+        logWarning("Cannot retrieved index info.")
+    }
+    indexMap.values.toArray
+  }
+
+  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+    e match {
+      case sqlException: SQLException =>
+        sqlException.getErrorCode match {
+          // ER_DUP_KEYNAME
+          case 1061 =>
+            throw new IndexAlreadyExistsException(message, cause = Some(e))
+          case 1091 =>
+            throw new NoSuchIndexException(message, cause = Some(e))
+          case _ => super.classifyException(message, e)
+        }
+      case unsupported: UnsupportedOperationException => throw unsupported
+      case _ => super.classifyException(message, e)
     }
-    super.classifyException(message, e)
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to