Repository: spark Updated Branches: refs/heads/master 015f7ef50 -> 12b7191d2
[SPARK-10855] [SQL] Add a JDBC dialect for Apache Derby marmbrus rxin This patch adds a JdbcDialect class, which customizes the datatype mappings for Derby backends. The patch also adds unit tests for the new dialect, corresponding to the existing tests for other JDBC dialects. JDBCSuite runs cleanly for me with this patch. So does JDBCWriteSuite, although it produces noise as described here: https://issues.apache.org/jira/browse/SPARK-10890 This patch is my original work, which I license to the ASF. I am a Derby contributor, so my ICLA is on file under SVN id "rhillegas": http://people.apache.org/committer-index.html Touches the following files: --------------------------------- org.apache.spark.sql.jdbc.JdbcDialects Adds a DerbyDialect. --------------------------------- org.apache.spark.sql.jdbc.JDBCSuite Adds unit tests for the new DerbyDialect. Author: Rick Hillegas <[email protected]> Closes #8982 from rick-ibm/b_10855. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12b7191d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12b7191d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12b7191d Branch: refs/heads/master Commit: 12b7191d2075ae870c73529de450cbb5725872ec Parents: 015f7ef Author: Rick Hillegas <[email protected]> Authored: Fri Oct 9 13:36:51 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Oct 9 13:36:51 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/jdbc/JdbcDialects.scala | 28 ++++++++++++++++++++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 +++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/12b7191d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 0cd356f..a2ff4cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -138,6 +138,7 @@ object JdbcDialects { registerDialect(PostgresDialect) registerDialect(DB2Dialect) registerDialect(MsSqlServerDialect) + registerDialect(DerbyDialect) /** @@ -287,3 +288,30 @@ case object MsSqlServerDialect extends JdbcDialect { case _ => None } } + +/** + * :: DeveloperApi :: + * Default Apache Derby dialect, mapping real on read + * and string/byte/short/boolean/decimal on write. + */ +@DeveloperApi +case object DerbyDialect extends JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + if (sqlType == Types.REAL) Option(FloatType) else None + } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) + case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) + case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) + // 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL + case (t: DecimalType) if (t.precision > 31) => + Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL)) + case _ => None + } + +} + http://git-wip-us.apache.org/repos/asf/spark/blob/12b7191d/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index bbf705c..d530b1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -409,18 +409,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect) + assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } test("quote column names by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") + val Derby = JdbcDialects.get("jdbc:derby:db") val columns = Seq("abc", "key") val MySQLColumns = columns.map(MySQL.quoteIdentifier(_)) val PostgresColumns = columns.map(Postgres.quoteIdentifier(_)) + val DerbyColumns = columns.map(Derby.quoteIdentifier(_)) assert(MySQLColumns === Seq("`abc`", "`key`")) assert(PostgresColumns === Seq(""""abc"""", """"key"""")) + assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } test("Dialect unregister") { @@ -454,16 +458,23 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("PostgresDialect type mapping") { val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") - // SPARK-7869: Testing JSON types handling assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType)) assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType)) } + test("DerbyDialect jdbc type mapping") { + val derbyDialect = JdbcDialects.get("jdbc:derby:db") + assert(derbyDialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(derbyDialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT") + assert(derbyDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db") val h2 = JdbcDialects.get(url) + val derby = JdbcDialects.get("jdbc:derby:db") val table = "weblogs" val defaultQuery = s"SELECT * FROM $table WHERE 1=0" val limitQuery = s"SELECT 1 FROM $table LIMIT 1" @@ -471,5 +482,6 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(Postgres.getTableExistsQuery(table) == limitQuery) assert(db2.getTableExistsQuery(table) == defaultQuery) assert(h2.getTableExistsQuery(table) == defaultQuery) + assert(derby.getTableExistsQuery(table) == defaultQuery) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
