This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a453fd5 [SPARK-36914][SQL] Implement dropIndex and listIndexes in
JDBC (MySQL dialect)
a453fd5 is described below
commit a453fd55dd37516fbfb9332cf43e360796dfb955
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Oct 12 22:36:47 2021 +0800
[SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL
dialect)
### What changes were proposed in this pull request?
This PR implements `dropIndex` and `listIndexes` in MySQL dialect
### Why are the changes needed?
As a subtask of the V2 Index support, this PR completes the implementation
for JDBC V2 index support.
### Does this PR introduce _any_ user-facing change?
Yes, `dropIndex/listIndexes` in DS V2 JDBC
### How was this patch tested?
new tests
Closes #34236 from huaxingao/listIndexJDBC.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 33 +++-----
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 95 +++++++++++++++++++++-
.../sql/connector/catalog/index/SupportsIndex.java | 7 +-
.../sql/connector/catalog/index/TableIndex.java | 12 ++-
.../catalyst/analysis/NoSuchItemException.scala | 4 +-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 24 ++++++
.../execution/datasources/v2/jdbc/JDBCTable.scala | 13 ++-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 25 +++++-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 84 +++++++++++++++----
9 files changed, 239 insertions(+), 58 deletions(-)
diff --git
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 3cb8787..67e8108 100644
---
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
-import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier,
TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite with V2JDBCTest {
assert(t.schema === expectedSchema)
}
- override def testIndex(tbl: String): Unit = {
- val loaded = Catalogs.load("mysql", conf)
- val jdbcTable = loaded.asInstanceOf[TableCatalog]
- .loadTable(Identifier.of(Array.empty[String], "new_table"))
- .asInstanceOf[SupportsIndex]
- assert(jdbcTable.indexExists("i1") == false)
- assert(jdbcTable.indexExists("i2") == false)
+ override def supportsIndex: Boolean = true
+ override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
val properties = new util.Properties();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
- jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+ // MySQL doesn't allow property set on individual column, so use empty
Array for
+ // column properties
+ jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
- jdbcTable.createIndex("i2", "",
- Array(FieldReference("col2"), FieldReference("col3"),
FieldReference("col5")),
- Array.empty[util.Map[NamedReference, util.Properties]], new
util.Properties)
-
- assert(jdbcTable.indexExists("i1") == true)
- assert(jdbcTable.indexExists("i2") == true)
-
- val m = intercept[IndexAlreadyExistsException] {
- jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
- }.getMessage
- assert(m.contains("Failed to create index: i1 in new_table"))
+ var index = jdbcTable.listIndexes()
+ // The index property size is actually 1. Even though the index is created
+ // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a
comment'", when
+ // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
+ assert(index(0).properties.size == 1)
+ assert(index(0).properties.get("COMMENT").equals("this is a comment"))
}
}
diff --git
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index da57ed7..f3e3b34 100644
---
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -17,9 +17,15 @@
package org.apache.spark.sql.jdbc.v2
+import java.util
+
import org.apache.log4j.Level
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier,
TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -181,12 +187,93 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
}
}
- def testIndex(tbl: String): Unit = {}
+ def supportsIndex: Boolean = false
+ def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
test("SPARK-36913: Test INDEX") {
- withTable(s"$catalogName.new_table") {
- sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT,
col4 INT, col5 INT)")
- testIndex(s"$catalogName.new_table")
+ if (supportsIndex) {
+ withTable(s"$catalogName.new_table") {
+ sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3
INT," +
+ s" col4 INT, col5 INT)")
+ val loaded = Catalogs.load(catalogName, conf)
+ val jdbcTable = loaded.asInstanceOf[TableCatalog]
+ .loadTable(Identifier.of(Array.empty[String], "new_table"))
+ .asInstanceOf[SupportsIndex]
+ assert(jdbcTable.indexExists("i1") == false)
+ assert(jdbcTable.indexExists("i2") == false)
+
+ val properties = new util.Properties();
+ val indexType = "DUMMY"
+ var m = intercept[UnsupportedOperationException] {
+ jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ }.getMessage
+ assert(m.contains(s"Index Type $indexType is not supported." +
+ s" The supported Index Types are: BTREE and HASH"))
+
+ jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+ jdbcTable.createIndex("i2", "",
+ Array(FieldReference("col2"), FieldReference("col3"),
FieldReference("col5")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+ assert(jdbcTable.indexExists("i1") == true)
+ assert(jdbcTable.indexExists("i2") == true)
+
+ m = intercept[IndexAlreadyExistsException] {
+ jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ }.getMessage
+ assert(m.contains("Failed to create index: i1 in new_table"))
+
+ var index = jdbcTable.listIndexes()
+ assert(index.length == 2)
+
+ assert(index(0).indexName.equals("i1"))
+ assert(index(0).indexType.equals("BTREE"))
+ var cols = index(0).columns
+ assert(cols.length == 1)
+ assert(cols(0).describe().equals("col1"))
+ assert(index(0).properties.size == 0)
+
+ assert(index(1).indexName.equals("i2"))
+ assert(index(1).indexType.equals("BTREE"))
+ cols = index(1).columns
+ assert(cols.length == 3)
+ assert(cols(0).describe().equals("col2"))
+ assert(cols(1).describe().equals("col3"))
+ assert(cols(2).describe().equals("col5"))
+ assert(index(1).properties.size == 0)
+
+ jdbcTable.dropIndex("i1")
+ assert(jdbcTable.indexExists("i1") == false)
+ assert(jdbcTable.indexExists("i2") == true)
+
+ index = jdbcTable.listIndexes()
+ assert(index.length == 1)
+
+ assert(index(0).indexName.equals("i2"))
+ assert(index(0).indexType.equals("BTREE"))
+ cols = index(0).columns
+ assert(cols.length == 3)
+ assert(cols(0).describe().equals("col2"))
+ assert(cols(1).describe().equals("col3"))
+ assert(cols(2).describe().equals("col5"))
+
+ jdbcTable.dropIndex("i2")
+ assert(jdbcTable.indexExists("i1") == false)
+ assert(jdbcTable.indexExists("i2") == false)
+ index = jdbcTable.listIndexes()
+ assert(index.length == 0)
+
+ m = intercept[NoSuchIndexException] {
+ jdbcTable.dropIndex("i2")
+ }.getMessage
+ assert(m.contains("Failed to drop index: i2"))
+
+ testIndexProperties(jdbcTable)
+ }
}
}
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 24961e4..4181cf5 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -42,7 +42,7 @@ public interface SupportsIndex extends Table {
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to
be created
* @param properties the properties of the index to be created
- * @throws IndexAlreadyExistsException If the index already exists (optional)
+ * @throws IndexAlreadyExistsException If the index already exists.
*/
void createIndex(String indexName,
String indexType,
@@ -55,10 +55,9 @@ public interface SupportsIndex extends Table {
* Drops the index with the given name.
*
* @param indexName the name of the index to be dropped.
- * @return true if the index is dropped
- * @throws NoSuchIndexException If the index does not exist (optional)
+ * @throws NoSuchIndexException If the index does not exist.
*/
- boolean dropIndex(String indexName) throws NoSuchIndexException;
+ void dropIndex(String indexName) throws NoSuchIndexException;
/**
* Checks whether an index exists in this table.
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
index 99fce80..977ed8d 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java
@@ -53,27 +53,25 @@ public final class TableIndex {
/**
* @return the Index name.
*/
- String indexName() { return indexName; }
+ public String indexName() { return indexName; }
/**
* @return the indexType of this Index.
*/
- String indexType() { return indexType; }
+ public String indexType() { return indexType; }
/**
* @return the column(s) this Index is on. Could be multi columns (a
multi-column index).
*/
- NamedReference[] columns() { return columns; }
+ public NamedReference[] columns() { return columns; }
/**
* @return the map of column and column property map.
*/
- Map<NamedReference, Properties> columnProperties() { return
columnProperties; }
+ public Map<NamedReference, Properties> columnProperties() { return
columnProperties; }
/**
* Returns the index properties.
*/
- Properties properties() {
- return properties;
- }
+ public Properties properties() { return properties; }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 500121c..805f308 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -108,5 +108,5 @@ case class NoSuchPartitionsException(override val message:
String)
case class NoSuchTempFunctionException(func: String)
extends AnalysisException(s"Temporary function '$func' not found")
-class NoSuchIndexException(indexName: String)
- extends AnalysisException(s"Index '$indexName' not found")
+class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
+ extends AnalysisException(message, cause = cause)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 168d16a..d482245 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils,
GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
@@ -1040,6 +1041,29 @@ object JdbcUtils extends Logging with SQLConfHelper {
dialect.indexExists(conn, indexName, tableName, options)
}
+ /**
+ * Drop an index.
+ */
+ def dropIndex(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Unit = {
+ val dialect = JdbcDialects.get(options.url)
+ executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
+ }
+
+ /**
+ * List all the indexes in a table.
+ */
+ def listIndexes(
+ conn: Connection,
+ tableName: String,
+ options: JDBCOptions): Array[TableIndex] = {
+ val dialect = JdbcDialects.get(options.url)
+ dialect.listIndexes(conn, tableName, options)
+ }
+
private def executeStatement(conn: Connection, options: JDBCOptions, sql:
String): Unit = {
val statement = conn.createStatement
try {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 1db938e..23ff503 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
}
}
- override def dropIndex(indexName: String): Boolean = {
- throw new UnsupportedOperationException("dropIndex is not supported yet")
+ override def dropIndex(indexName: String): Unit = {
+ JdbcUtils.withConnection(jdbcOptions) { conn =>
+ JdbcUtils.classifyException(s"Failed to drop index: $indexName",
+ JdbcDialects.get(jdbcOptions.url)) {
+ JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
+ }
+ }
}
override def listIndexes(): Array[TableIndex] = {
- throw new UnsupportedOperationException("listIndexes is not supported yet")
+ JdbcUtils.withConnection(jdbcOptions) { conn =>
+ JdbcUtils.listIndexes(conn, name, jdbcOptions)
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d1c4f8d..eb3986c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -290,7 +291,7 @@ abstract class JdbcDialect extends Serializable with
Logging{
}
/**
- * Creates an index.
+ * Build a create index SQL statement.
*
* @param indexName the name of the index to be created
* @param indexType the type of the index to be created
@@ -298,6 +299,7 @@ abstract class JdbcDialect extends Serializable with
Logging{
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to
be created
* @param properties the properties of the index to be created
+ * @return the SQL statement to use for creating the index.
*/
def createIndex(
indexName: String,
@@ -327,6 +329,27 @@ abstract class JdbcDialect extends Serializable with
Logging{
}
/**
+ * Build a drop index SQL statement.
+ *
+ * @param indexName the name of the index to be dropped.
+ * @param tableName the table name on which index to be dropped.
+ * @return the SQL statement to use for dropping the index.
+ */
+ def dropIndex(indexName: String, tableName: String): String = {
+ throw new UnsupportedOperationException("dropIndex is not supported")
+ }
+
+ /**
+ * Lists all the indexes in this table.
+ */
+ def listIndexes(
+ conn: Connection,
+ tableName: String,
+ options: JDBCOptions): Array[TableIndex] = {
+ throw new UnsupportedOperationException("listIndexes is not supported")
+ }
+
+ /**
* Gets a dialect exception, classifies it and wraps it by
`AnalysisException`.
* @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5c16ef6..7e85b3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -25,8 +25,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
-import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType,
MetadataBuilder}
@@ -127,10 +128,19 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
indexProperties = indexProperties + " " + s"$k $v"
}
}
-
+ val iType = if (indexType.isEmpty) {
+ ""
+ } else {
+ if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
+ !indexType.equalsIgnoreCase("HASH")) {
+ throw new UnsupportedOperationException(s"Index Type $indexType is not
supported." +
+ " The supported Index Types are: BTREE and HASH")
+ }
+ s"USING $indexType"
+ }
// columnsProperties doesn't apply to MySQL so it is ignored
- s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
- s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")})
$indexProperties"
+ s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
+ s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})
$indexProperties"
}
// SHOW INDEX syntax
@@ -157,17 +167,61 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
}
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
- if (e.isInstanceOf[SQLException]) {
- // Error codes are from
- //
https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes
- e.asInstanceOf[SQLException].getErrorCode match {
- // ER_DUP_KEYNAME
- case 1061 =>
- throw new IndexAlreadyExistsException(message, cause = Some(e))
- case _ =>
+ override def dropIndex(indexName: String, tableName: String): String = {
+ s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName"
+ }
+
+ // SHOW INDEX syntax
+ // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+ override def listIndexes(
+ conn: Connection,
+ tableName: String,
+ options: JDBCOptions): Array[TableIndex] = {
+ val sql = s"SHOW INDEXES FROM $tableName"
+ var indexMap: Map[String, TableIndex] = Map()
+ try {
+ val rs = JdbcUtils.executeQuery(conn, options, sql)
+ while (rs.next()) {
+ val indexName = rs.getString("key_name")
+ val colName = rs.getString("column_name")
+ val indexType = rs.getString("index_type")
+ val indexComment = rs.getString("Index_comment")
+ if (indexMap.contains(indexName)) {
+ val index = indexMap.get(indexName).get
+ val newIndex = new TableIndex(indexName, indexType,
+ index.columns() :+ FieldReference(colName),
+ index.columnProperties, index.properties)
+ indexMap += (indexName -> newIndex)
+ } else {
+ // The only property we are building here is `COMMENT` because it's
the only one
+ // we can get from `SHOW INDEXES`.
+ val properties = new util.Properties();
+ if (indexComment.nonEmpty) properties.put("COMMENT", indexComment)
+ val index = new TableIndex(indexName, indexType,
Array(FieldReference(colName)),
+ new util.HashMap[NamedReference, util.Properties](), properties)
+ indexMap += (indexName -> index)
+ }
}
+ } catch {
+ case _: Exception =>
+ logWarning("Cannot retrieved index info.")
+ }
+ indexMap.values.toArray
+ }
+
+ override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ e match {
+ case sqlException: SQLException =>
+ sqlException.getErrorCode match {
+ // ER_DUP_KEYNAME
+ case 1061 =>
+ throw new IndexAlreadyExistsException(message, cause = Some(e))
+ case 1091 =>
+ throw new NoSuchIndexException(message, cause = Some(e))
+ case _ => super.classifyException(message, e)
+ }
+ case unsupported: UnsupportedOperationException => throw unsupported
+ case _ => super.classifyException(message, e)
}
- super.classifyException(message, e)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]