Repository: spark Updated Branches: refs/heads/master 3aff0866a -> b8f849b54
[SPARK-7869][SQL] Adding Postgres JSON and JSONb data types support This PR addresses [SPARK-7869](https://issues.apache.org/jira/browse/SPARK-7869) Before the patch, attempt to load the table from Postgres with JSON/JSONb datatype caused error `java.sql.SQLException: Unsupported type 1111` Postgres data types JSON and JSONb are now mapped to String on Spark side thus they can be loaded into DF and processed on Spark side Example Postgres: ``` create table test_json (id int, value json); create table test_jsonb (id int, value jsonb); insert into test_json (id, value) values (1, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::json), (2, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::json), (3, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::json); insert into test_jsonb (id, value) values (4, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::jsonb), (5, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::jsonb), (6, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::jsonb); ``` PySpark: ``` >>> import json >>> df1 = >>> sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", >>> "test_json") >>> df1.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, >>> value.get('field3'))).collect() [(1, [1, 2, 3]), (2, [4, 5, 6]), (3, [7, 8, 9])] >>> df2 = >>> sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", >>> "test_jsonb") >>> df2.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, >>> value.get('field1'))).collect() [(4, u'value1'), (5, u'value3'), (6, None)] ``` Author: 0x0FFF <[email protected]> Closes #8948 from 0x0FFF/SPARK-7869. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8f849b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8f849b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8f849b5 Branch: refs/heads/master Commit: b8f849b546739d3e4339563557509a51417fcb68 Parents: 3aff086 Author: 0x0FFF <[email protected]> Authored: Wed Oct 7 23:12:35 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Oct 7 23:12:35 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 4 ++++ .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++++++ 2 files changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b8f849b5/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 5abbfbf..0cd356f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -202,6 +202,10 @@ case object PostgresDialect extends JdbcDialect { Some(StringType) } else if (sqlType == Types.OTHER && typeName.equals("inet")) { Some(StringType) + } else if (sqlType == Types.OTHER && typeName.equals("json")) { + Some(StringType) + } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) { + Some(StringType) } else None } http://git-wip-us.apache.org/repos/asf/spark/blob/b8f849b5/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index c4b039a..bbf705c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -452,6 +452,13 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") } + test("PostgresDialect type mapping") { + val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") + // SPARK-7869: Testing JSON types handling + assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType)) + assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType)) + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
