This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6994bad5e6e [SPARK-44262][SQL] Add `dropTable` and
`getInsertStatement` to JdbcDialect
6994bad5e6e is described below
commit 6994bad5e6ea8700d48cbe20e9b406b89925adc7
Author: Jia Fan <[email protected]>
AuthorDate: Mon Oct 16 13:55:24 2023 +0500
[SPARK-44262][SQL] Add `dropTable` and `getInsertStatement` to JdbcDialect
### What changes were proposed in this pull request?
1. This PR add `dropTable` function to `JdbcDialect`. So user can override
dropTable SQL by other JdbcDialect like Neo4J
Neo4J Drop case
```sql
MATCH (m:Person {name: 'Mark'})
DELETE m
```
2. Also add `getInsertStatement` for same reason.
Neo4J Insert case
```sql
MATCH (p:Person {name: 'Jennifer'})
SET p.birthdate = date('1980-01-01')
RETURN p
```
Neo4J SQL(in fact named `CQL`) not like normal SQL, but it have JDBC driver.
### Why are the changes needed?
Make JdbcDialect more useful
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
exist test
Closes #41855 from Hisoka-X/SPARK-44262_JDBCUtils_improve.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 14 +++++------
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 29 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fb9e11df188..f2b84810175 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -78,7 +78,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String, options: JDBCOptions): Unit =
{
- executeStatement(conn, options, s"DROP TABLE $table")
+ val dialect = JdbcDialects.get(options.url)
+ executeStatement(conn, options, dialect.dropTable(table))
}
/**
@@ -114,22 +115,19 @@ object JdbcUtils extends Logging with SQLConfHelper {
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
val columns = if (tableSchema.isEmpty) {
- rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
+ rddSchema.fields
} else {
// The generated insert statement needs to follow rddSchema's column
sequence and
// tableSchema's column names. When appending data into some
case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive
column names instead of
// RDD column names for user convenience.
- val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
- val normalizedName = tableColumnNames.find(f => conf.resolver(f,
col.name)).getOrElse {
+ tableSchema.get.find(f => conf.resolver(f.name, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col,
tableSchema)
}
- dialect.quoteIdentifier(normalizedName)
- }.mkString(",")
+ }
}
- val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
- s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+ dialect.insertIntoTable(table, columns)
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 22625523a04..37c378c294c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -193,6 +193,24 @@ abstract class JdbcDialect extends Serializable with
Logging {
statement.executeUpdate(s"CREATE TABLE $tableName ($strSchema)
$createTableOptions")
}
+ /**
+ * Returns an Insert SQL statement template for inserting a row into the
target table via JDBC
+ * conn. Use "?" as placeholder for each value to be inserted.
+ * E.g. `INSERT INTO t ("name", "age", "gender") VALUES (?, ?, ?)`
+ *
+ * @param table The name of the table.
+ * @param fields The fields of the row that will be inserted.
+ * @return The SQL query to use for insert data into table.
+ */
+ @Since("4.0.0")
+ def insertIntoTable(
+ table: String,
+ fields: Array[StructField]): String = {
+ val placeholders = fields.map(_ => "?").mkString(",")
+ val columns = fields.map(x => quoteIdentifier(x.name)).mkString(",")
+ s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+ }
+
/**
* Get the SQL query that should be used to find if the given table exists.
Dialects can
* override this method to return a query that works best in a particular
database.
@@ -542,6 +560,17 @@ abstract class JdbcDialect extends Serializable with
Logging {
}
}
+ /**
+ * Build a SQL statement to drop the given table.
+ *
+ * @param table the table name
+ * @return The SQL statement to use for drop the table.
+ */
+ @Since("4.0.0")
+ def dropTable(table: String): String = {
+ s"DROP TABLE $table"
+ }
+
/**
* Build a create index SQL statement.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]